From 8c2323dc8d0e107f1a99898370c7515fa9603122 Mon Sep 17 00:00:00 2001 From: Joseph Percivall Date: Mon, 2 Nov 2015 15:45:20 -0500 Subject: [PATCH] NIFI-1086 Provide refactoring of InvokeHTTP NIFI-980 Add support for HTTP Digest authentication to InvokeHttp NIFI-1080 Provide additional InvokeHttp unit tests NIFI-1133 InvokeHTTP Processor does not save Location header for 3xx responses NIFI-1009 InvokeHTTP should be able to be scheduled without any incoming connection for GET operations NIFI-61 Multiple improvements for InvokeHTTP inclusive of providing unique tx.id across clusters, dynamic HTTP header properties Signed-off-by: Aldrin Piri --- .../nifi-standard-processors/pom.xml | 52 +- .../nifi/processors/standard/InvokeHTTP.java | 1235 +++++++++-------- .../processors/standard/TestInvokeHTTP.java | 91 +- .../standard/TestInvokeHttpSSL.java | 3 +- .../standard/util/TestInvokeHttpCommon.java | 1031 +++++++++++--- pom.xml | 38 + 6 files changed, 1662 insertions(+), 788 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 76f9daf323..0427927ba7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -124,11 +124,6 @@ language governing permissions and limitations under the License. --> net.sf.saxon Saxon-HE - - org.apache.nifi - nifi-mock - test - org.apache.nifi nifi-socket-utils @@ -137,11 +132,6 @@ language governing permissions and limitations under the License. --> org.apache.nifi nifi-load-distribution-service-api - - org.apache.nifi - nifi-distributed-cache-client-service - test - joda-time joda-time @@ -154,20 +144,10 @@ language governing permissions and limitations under the License. --> org.apache.activemq activemq-client - - org.apache.activemq - activemq-broker - test - com.jayway.jsonpath json-path - - org.apache.nifi - nifi-ssl-context-service - test - org.apache.tika tika-core @@ -189,7 +169,37 @@ language governing permissions and limitations under the License. --> org.codehaus.jackson jackson-mapper-asl - + + com.squareup.okhttp + okhttp + 2.5.0 + + + com.burgstaller + okhttp-digest + 0.4 + jar + + + org.apache.nifi + nifi-mock + test + + + org.apache.nifi + nifi-distributed-cache-client-service + test + + + org.apache.activemq + activemq-broker + test + + + org.apache.nifi + nifi-ssl-context-service + test + org.apache.derby derby diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java index d827658a40..2a9760d456 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java @@ -18,13 +18,8 @@ package org.apache.nifi.processors.standard; import static org.apache.commons.lang3.StringUtils.trimToEmpty; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; -import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.net.Proxy; import java.net.Proxy.Type; @@ -41,30 +36,49 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.net.ssl.HostnameVerifier; -import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSession; -import org.apache.commons.codec.binary.Base64; +import com.burgstaller.okhttp.AuthenticationCacheInterceptor; +import com.burgstaller.okhttp.CachingAuthenticatorDecorator; +import com.burgstaller.okhttp.DispatchingAuthenticator; +import com.burgstaller.okhttp.digest.CachingAuthenticator; +import com.burgstaller.okhttp.digest.DigestAuthenticator; + +import com.squareup.okhttp.MediaType; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.Request; +import com.squareup.okhttp.RequestBody; +import com.squareup.okhttp.Response; + +import com.squareup.okhttp.ResponseBody; +import okio.BufferedSink; +import org.apache.commons.io.input.TeeInputStream; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -72,118 +86,61 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService.ClientAuth; +import org.apache.nifi.stream.io.StreamUtils; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @SupportsBatching @Tags({"http", "https", "rest", "client"}) -@InputRequirement(Requirement.INPUT_REQUIRED) +@InputRequirement(Requirement.INPUT_ALLOWED) +@TriggerWhenEmpty @CapabilityDescription("An HTTP client processor which converts FlowFile attributes to HTTP headers, with configurable HTTP method, url, etc.") @WritesAttributes({ @WritesAttribute(attribute = "invokehttp.status.code", description = "The status code that is returned"), @WritesAttribute(attribute = "invokehttp.status.message", description = "The status message that is returned"), - @WritesAttribute(attribute = "invokehttp.response.body", description = "The response body"), + @WritesAttribute(attribute = "invokehttp.response.body", description = "In the instance where the status code received is not a success (2xx) " + + "then the response body will be put to the 'invokehttp.response.body' attribute of the request FlowFile."), @WritesAttribute(attribute = "invokehttp.request.url", description = "The request URL"), @WritesAttribute(attribute = "invokehttp.tx.id", description = "The transaction ID that is returned after reading the response"), - @WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server")}) -@DynamicProperty(name = "Trusted Hostname", value = "A hostname", description = "Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted " - + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.") + @WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server"), + @WritesAttribute(attribute = "user-defined", description = "If the 'Put Response Body In Attribute' property is set then whatever it is set to " + + "will become the attribute key and the value would be the body of the HTTP response.")}) +@DynamicProperty(name = "Header Name", value = "Attribute Expression Language", supportsExpressionLanguage = true, description = "Send request header " + + "with a key matching the Dynamic Property Key and a value created by evaluating the Attribute Expression Language set in the value " + + "of the Dynamic Property.") public final class InvokeHTTP extends AbstractProcessor { - @Override - protected List getSupportedPropertyDescriptors() { - return Config.PROPERTIES; - } + // flowfile attribute keys returned after reading the response + public final static String STATUS_CODE = "invokehttp.status.code"; + public final static String STATUS_MESSAGE = "invokehttp.status.message"; + public final static String RESPONSE_BODY = "invokehttp.response.body"; + public final static String REQUEST_URL = "invokehttp.request.url"; + public final static String TRANSACTION_ID = "invokehttp.tx.id"; + public final static String REMOTE_DN = "invokehttp.remote.dn"; - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) { - if (Config.PROP_TRUSTED_HOSTNAME.getName().equalsIgnoreCase(propertyDescriptorName)) { - return Config.PROP_TRUSTED_HOSTNAME; - } - return super.getSupportedDynamicPropertyDescriptor(propertyDescriptorName); - } - - @Override - public Set getRelationships() { - return Config.RELATIONSHIPS; - } - - private volatile Pattern attributesToSend = null; - - @Override - public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - final String trimmedValue = StringUtils.trimToEmpty(newValue); - - // compile the attributes-to-send filter pattern - if (Config.PROP_ATTRIBUTES_TO_SEND.getName().equalsIgnoreCase(descriptor.getName())) { - if (newValue.isEmpty()) { - attributesToSend = null; - } else { - attributesToSend = Pattern.compile(trimmedValue); - } - } - } - - @Override - protected Collection customValidate(final ValidationContext validationContext) { - final List results = new ArrayList<>(1); - final boolean proxyHostSet = validationContext.getProperty(Config.PROP_PROXY_HOST).isSet(); - final boolean proxyPortSet = validationContext.getProperty(Config.PROP_PROXY_PORT).isSet(); - - if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) { - results.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build()); - } - - return results; - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - final FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final SSLContextService sslService = context.getProperty(Config.PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(ClientAuth.NONE); - - Transaction transaction = new Transaction(getLogger(), sslContext, attributesToSend, context, session, flowFile); - transaction.process(); - } - - /** - * Stores properties, relationships, configuration values, hard coded strings, magic numbers, etc. - */ - public interface Config { - // flowfile attribute keys returned after reading the response - String STATUS_CODE = "invokehttp.status.code"; - String STATUS_MESSAGE = "invokehttp.status.message"; - String RESPONSE_BODY = "invokehttp.response.body"; - String REQUEST_URL = "invokehttp.request.url"; - String TRANSACTION_ID = "invokehttp.tx.id"; - String REMOTE_DN = "invokehttp.remote.dn"; - - // Set of flowfile attributes which we generally always ignore during - // processing, including when converting http headers, copying attributes, etc. - // This set includes our strings defined above as well as some standard flowfile - // attributes. - public static final Set IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + // Set of flowfile attributes which we generally always ignore during + // processing, including when converting http headers, copying attributes, etc. + // This set includes our strings defined above as well as some standard flowfile + // attributes. + public static final Set IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( STATUS_CODE, STATUS_MESSAGE, RESPONSE_BODY, REQUEST_URL, TRANSACTION_ID, REMOTE_DN, "uuid", "filename", "path"))); - // properties - public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder() + // properties + public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder() .name("HTTP Method") - .description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS).") + .description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS). Arbitrary methods are also supported but will " + + "be sent without a message body.") .required(true) .defaultValue("GET") .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) .build(); - public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder() .name("Remote URL") .description("Remote URL which will be connected to, including scheme, host, port, path.") .required(true) @@ -191,7 +148,7 @@ public final class InvokeHTTP extends AbstractProcessor { .addValidator(StandardValidators.URL_VALIDATOR) .build(); - public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder() .name("Connection Timeout") .description("Max wait time for connection to remote service.") .required(true) @@ -199,7 +156,7 @@ public final class InvokeHTTP extends AbstractProcessor { .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); - public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder() .name("Read Timeout") .description("Max wait time for response from remote service.") .required(true) @@ -207,7 +164,7 @@ public final class InvokeHTTP extends AbstractProcessor { .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); - public static final PropertyDescriptor PROP_DATE_HEADER = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_DATE_HEADER = new PropertyDescriptor.Builder() .name("Include Date Header") .description("Include an RFC-2616 Date header in the request.") .required(true) @@ -216,7 +173,7 @@ public final class InvokeHTTP extends AbstractProcessor { .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); - public static final PropertyDescriptor PROP_FOLLOW_REDIRECTS = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_FOLLOW_REDIRECTS = new PropertyDescriptor.Builder() .name("Follow Redirects") .description("Follow HTTP redirects issued by remote server.") .required(true) @@ -225,48 +182,50 @@ public final class InvokeHTTP extends AbstractProcessor { .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); - public static final PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder() .name("Attributes to Send") .description("Regular expression that defines which attributes to send as HTTP headers in the request. " - + "If not defined, no attributes are sent as headers.") + + "If not defined, no attributes are sent as headers. Also any dynamic properties set will be sent as headers. " + + "The dynamic property key will be the header key and the dynamic property value will be interpreted as expression " + + "language will be the header value.") .required(false) .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .build(); - public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") .description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.") .required(false) .identifiesControllerService(SSLContextService.class) .build(); - public static final PropertyDescriptor PROP_PROXY_HOST = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_PROXY_HOST = new PropertyDescriptor.Builder() .name("Proxy Host") .description("The fully qualified hostname or IP address of the proxy server") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - public static final PropertyDescriptor PROP_PROXY_PORT = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_PROXY_PORT = new PropertyDescriptor.Builder() .name("Proxy Port") .description("The port of the proxy server") .required(false) .addValidator(StandardValidators.PORT_VALIDATOR) .build(); - // Per RFC 7235, 2617, and 2616. - // basic-credentials = base64-user-pass - // base64-user-pass = userid ":" password - // userid = * - // password = *TEXT - // - // OCTET = - // CTL = - // LWS = [CRLF] 1*( SP | HT ) - // TEXT = - // - // Per RFC 7230, username & password in URL are now disallowed in HTTP and HTTPS URIs. - public static final PropertyDescriptor PROP_BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder() + // Per RFC 7235, 2617, and 2616. + // basic-credentials = base64-user-pass + // base64-user-pass = userid ":" password + // userid = * + // password = *TEXT + // + // OCTET = + // CTL = + // LWS = [CRLF] 1*( SP | HT ) + // TEXT = + // + // Per RFC 7230, username & password in URL are now disallowed in HTTP and HTTPS URIs. + public static final PropertyDescriptor PROP_BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder() .name("Basic Authentication Username") .displayName("Basic Authentication Username") .description("The username to be used by the client to authenticate against the Remote URL. Cannot include control characters (0-31), ':', or DEL (127).") @@ -274,7 +233,7 @@ public final class InvokeHTTP extends AbstractProcessor { .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$"))) .build(); - public static final PropertyDescriptor PROP_BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROP_BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder() .name("Basic Authentication Password") .displayName("Basic Authentication Password") .description("The password to be used by the client to authenticate against the Remote URL.") @@ -283,7 +242,64 @@ public final class InvokeHTTP extends AbstractProcessor { .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$"))) .build(); - public static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + public static final PropertyDescriptor PROP_PUT_OUTPUT_IN_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("Put Response Body In Attribute") + .description("If set, the response body received back will be put into an attribute of the original FlowFile instead of a separate " + + "FlowFile. The attribute key to put to is determined by evaluating value of this property. ") + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor PROP_PUT_ATTRIBUTE_MAX_LENGTH = new PropertyDescriptor.Builder() + .name("Max Length To Put In Attribute") + .description("If routing the response body to an attribute of the original (by setting the \"Put response body in attribute\" " + + "property or by receiving an error status code), the number of characters put to the attribute value will be at " + + "most this amount. This is important because attributes are held in memory and large attributes will quickly " + + "cause out of memory issues. If the output goes longer than this value, it will be truncated to fit. " + + "Consider making this smaller if able.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("256") + .build(); + + public static final PropertyDescriptor PROP_DIGEST_AUTH = new PropertyDescriptor.Builder() + .name("Digest Authentication") + .displayName("Use Digest Authentication") + .description("Whether to communicate with the website using Digest Authentication. 'Basic Authentication Username' and 'Basic Authentication Password' are used " + + "for authentication.") + .required(false) + .defaultValue("false") + .allowableValues("true", "false") + .build(); + + public static final PropertyDescriptor PROP_OUTPUT_RESPONSE_REGARDLESS = new PropertyDescriptor.Builder() + .name("Always Output Response") + .description("Will force a response FlowFile to be generated and routed to the 'Response' relationship regardless of what the server status code received is " + + "or if the processor is configured to put the server response body in the request attribute. In the later configuration a request FlowFile with the " + + "response body in the attribute and a typical response FlowFile will be emitted to their respective relationships.") + .required(false) + .defaultValue("false") + .allowableValues("true", "false") + .build(); + + public static final PropertyDescriptor PROP_TRUSTED_HOSTNAME = new PropertyDescriptor.Builder() + .name("Trusted Hostname") + .description("Bypass the normal truststore hostname verifier to allow the specified remote hostname as trusted. " + + "Enabling this property has MITM security implications, use wisely. Will still accept other connections based " + + "on the normal truststore hostname verifier. Only valid with SSL (HTTPS) connections.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + + public static final PropertyDescriptor PROP_ADD_HEADERS_TO_REQUEST = new PropertyDescriptor.Builder() + .name("Add Response Headers to Request") + .description("Enabling this property saves all the response headers to the original request. This may be when the response headers are needed " + + "but a response is not generated due to the status code received.") + .required(false) + .defaultValue("false") + .allowableValues("true", "false") + .build(); + + public static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( PROP_METHOD, PROP_URL, PROP_SSL_CONTEXT_SERVICE, @@ -295,499 +311,626 @@ public final class InvokeHTTP extends AbstractProcessor { PROP_BASIC_AUTH_USERNAME, PROP_BASIC_AUTH_PASSWORD, PROP_PROXY_HOST, - PROP_PROXY_PORT)); + PROP_PROXY_PORT, + PROP_PUT_OUTPUT_IN_ATTRIBUTE, + PROP_PUT_ATTRIBUTE_MAX_LENGTH, + PROP_DIGEST_AUTH, + PROP_OUTPUT_RESPONSE_REGARDLESS, + PROP_TRUSTED_HOSTNAME, + PROP_ADD_HEADERS_TO_REQUEST)); - // property to allow the hostname verifier to be overridden - // this is a "hidden" property - it's configured using a dynamic user property - public static final PropertyDescriptor PROP_TRUSTED_HOSTNAME = new PropertyDescriptor.Builder() - .name("Trusted Hostname") - .description("Bypass the normal truststore hostname verifier to allow the specified remote hostname as trusted. " - + "Enabling this property has MITM security implications, use wisely. Will still accept other connections based " - + "on the normal truststore hostname verifier. Only valid with SSL (HTTPS) connections.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .dynamic(true) - .build(); - - // relationships - public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder() + // relationships + public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder() .name("Original") - .description("Original FlowFile will be routed upon success (2xx status codes).") + .description("The original FlowFile will be routed upon success (2xx status codes). It will have new attributes detailing the " + + "success of the request.") .build(); - public static final Relationship REL_SUCCESS_RESP = new Relationship.Builder() + public static final Relationship REL_RESPONSE = new Relationship.Builder() .name("Response") - .description("Response FlowFile will be routed upon success (2xx status codes).") + .description("A Response FlowFile will be routed upon success (2xx status codes). If the 'Output Response Regardless' property " + + "is true then the response will be sent to this relationship regardless of the status code received.") .build(); - public static final Relationship REL_RETRY = new Relationship.Builder() + public static final Relationship REL_RETRY = new Relationship.Builder() .name("Retry") - .description("FlowFile will be routed on any status code that can be retried (5xx status codes).") + .description("The original FlowFile will be routed on any status code that can be retried (5xx status codes). It will have new " + + "attributes detailing the request.") .build(); - public static final Relationship REL_NO_RETRY = new Relationship.Builder() + public static final Relationship REL_NO_RETRY = new Relationship.Builder() .name("No Retry") - .description("FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes).") + .description("The original FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes). " + + "It will have new attributes detailing the request.") .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() + public static final Relationship REL_FAILURE = new Relationship.Builder() .name("Failure") - .description("FlowFile will be routed on any type of connection failure, timeout or general exception.") + .description("The original FlowFile will be routed on any type of connection failure, timeout or general exception. " + + "It will have new attributes detailing the request.") .build(); - public static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( - REL_SUCCESS_REQ, REL_SUCCESS_RESP, REL_RETRY, REL_NO_RETRY, REL_FAILURE))); + public static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_SUCCESS_REQ, REL_RESPONSE, REL_RETRY, REL_NO_RETRY, REL_FAILURE))); - } + private volatile Set dynamicPropertyNames = new HashSet<>(); /** - * A single invocation of an HTTP request/response from the InvokeHTTP processor. This class encapsulates the entirety of the flowfile processing. - *

- * This class is not thread safe and is created new for every flowfile processed. + * Pattern used to compute RFC 2616 Dates (#sec3.3.1). This format is used by the HTTP Date header and is optionally sent by the processor. This date is effectively an RFC 822/1123 date + * string, but HTTP requires it to be in GMT (preferring the literal 'GMT' string). */ - private static class Transaction implements Config { + private static final String RFC_1123 = "EEE, dd MMM yyyy HH:mm:ss 'GMT'"; + private static final DateTimeFormatter DATE_FORMAT = DateTimeFormat.forPattern(RFC_1123).withLocale(Locale.US).withZoneUTC(); - /** - * Pattern used to compute RFC 2616 Dates (#sec3.3.1). This format is used by the HTTP Date header and is optionally sent by the processor. This date is effectively an RFC 822/1123 date - * string, but HTTP requires it to be in GMT (preferring the literal 'GMT' string). - */ - private static final String rfc1123 = "EEE, dd MMM yyyy HH:mm:ss 'GMT'"; - private static final DateTimeFormatter dateFormat = DateTimeFormat.forPattern(rfc1123).withLocale(Locale.US).withZoneUTC(); + private final AtomicReference okHttpClientAtomicReference = new AtomicReference<>(); - /** - * Every request/response cycle from this client has a unique transaction id which will be stored as a flowfile attribute. This generator is used to create the id. - */ - private static final AtomicLong txIdGenerator = new AtomicLong(); + public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream"; - private static final Charset utf8 = Charset.forName("UTF-8"); + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } - private final ProcessorLog logger; - private final SSLContext sslContext; - private final Pattern attributesToSend; - private final ProcessContext context; - private final ProcessSession session; + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .required(false) + .name(propertyDescriptorName) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .dynamic(true) + .expressionLanguageSupported(true) + .build(); + } - private final long txId = txIdGenerator.incrementAndGet(); - private final long startNanos = System.nanoTime(); + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } - private FlowFile request; - private FlowFile response; - private HttpURLConnection conn; + private volatile Pattern regexAttributesToSend = null; - private int statusCode; - private String statusMessage; - - public Transaction( - final ProcessorLog logger, - final SSLContext sslContext, - final Pattern attributesToSend, - final ProcessContext context, - final ProcessSession session, - final FlowFile request) { - - this.logger = logger; - this.sslContext = sslContext; - this.attributesToSend = attributesToSend; - this.context = context; - this.session = session; - this.request = request; - } - - - public void process() { - try { - openConnection(); - sendRequest(); - readResponse(); - transfer(); - } catch (final Exception e) { - // log exception - logger.error("Routing to {} due to exception: {}", new Object[] {REL_FAILURE.getName(), e}, e); - - // penalize - request = session.penalize(request); - - // transfer original to failure - session.transfer(request, REL_FAILURE); - - // cleanup response flowfile, if applicable - try { - if (response != null) { - session.remove(response); - } - } catch (final Exception e1) { - logger.error("Could not cleanup response flowfile due to exception: {}", new Object[] {e1}, e1); + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (descriptor.isDynamic()) { + final Set newDynamicPropertyNames = new HashSet<>(dynamicPropertyNames); + if (newValue == null) { + newDynamicPropertyNames.remove(descriptor.getName()); + } else if (oldValue == null) { // new property + newDynamicPropertyNames.add(descriptor.getName()); + } + this.dynamicPropertyNames = Collections.unmodifiableSet(newDynamicPropertyNames); + } else { + // compile the attributes-to-send filter pattern + if (PROP_ATTRIBUTES_TO_SEND.getName().equalsIgnoreCase(descriptor.getName())) { + if (newValue.isEmpty()) { + regexAttributesToSend = null; + } else { + final String trimmedValue = StringUtils.trimToEmpty(newValue); + regexAttributesToSend = Pattern.compile(trimmedValue); } } } + } - private void openConnection() throws IOException { + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final List results = new ArrayList<>(1); + final boolean proxyHostSet = validationContext.getProperty(PROP_PROXY_HOST).isSet(); + final boolean proxyPortSet = validationContext.getProperty(PROP_PROXY_PORT).isSet(); + + if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) { + results.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build()); + } + + return results; + } + + @OnScheduled + public void setUpClient(final ProcessContext context) throws IOException { + okHttpClientAtomicReference.set(null); + + OkHttpClient okHttpClient = new OkHttpClient(); + + // Add a proxy if set + final String proxyHost = context.getProperty(PROP_PROXY_HOST).getValue(); + final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).asInteger(); + if (proxyHost != null && proxyPort != null) { + final Proxy proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)); + okHttpClient.setProxy(proxy); + } + + // Set timeouts + okHttpClient.setConnectTimeout((context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS); + okHttpClient.setReadTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS); + + // Set whether to follow redirects + okHttpClient.setFollowRedirects(context.getProperty(PROP_FOLLOW_REDIRECTS).asBoolean()); + + final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(ClientAuth.NONE); + + // check if the ssl context is set and add the factory if so + if (sslContext != null) { + okHttpClient.setSslSocketFactory(sslContext.getSocketFactory()); + } + + // check the trusted hostname property and override the HostnameVerifier + String trustedHostname = trimToEmpty(context.getProperty(PROP_TRUSTED_HOSTNAME).getValue()); + if (!trustedHostname.isEmpty()) { + okHttpClient.setHostnameVerifier(new OverrideHostnameVerifier(trustedHostname, okHttpClient.getHostnameVerifier())); + } + + final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue()); + + // If the username/password properties are set then check if digest auth is being used + if (!authUser.isEmpty() && "true".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) { + final String authPass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue()); + + /* + * Currently OkHttp doesn't have built-in Digest Auth Support. The ticket for adding it is here: + * https://github.com/square/okhttp/issues/205#issuecomment-154047052 + * Once added this should be refactored to use the built in support. For now, a third party lib is needed. + */ + final Map authCache = new ConcurrentHashMap<>(); + + com.burgstaller.okhttp.digest.Credentials credentials = new com.burgstaller.okhttp.digest.Credentials(authUser, authPass); + + final DigestAuthenticator digestAuthenticator = new DigestAuthenticator(credentials); + + DispatchingAuthenticator authenticator = new DispatchingAuthenticator.Builder() + .with("Digest", digestAuthenticator) + .build(); + + okHttpClient.interceptors().add(new AuthenticationCacheInterceptor(authCache)); + okHttpClient.setAuthenticator(new CachingAuthenticatorDecorator(authenticator, authCache)); + } + + okHttpClientAtomicReference.set(okHttpClient); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + OkHttpClient okHttpClient = okHttpClientAtomicReference.get(); + + FlowFile requestFlowFile = session.get(); + + // Checking to see if the property to put the body of the response in an attribute was set + boolean putToAttribute = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).isSet(); + if (requestFlowFile == null) { + if(context.hasNonLoopConnection()){ + return; + } + + String request = context.getProperty(PROP_METHOD).evaluateAttributeExpressions().getValue().toUpperCase(); + if ("POST".equals(request) || "PUT".equals(request)) { + return; + } else if (putToAttribute) { + requestFlowFile = session.create(); + } + } + + // Setting some initial variables + final int maxAttributeSize = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger(); + final ProcessorLog logger = getLogger(); + + // Every request/response cycle has a unique transaction id which will be stored as a flowfile attribute. + final UUID txId = UUID.randomUUID(); + + FlowFile responseFlowFile = null; + try { // read the url property from the context - final String urlstr = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(request).getValue()); + final String urlstr = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(requestFlowFile).getValue()); final URL url = new URL(urlstr); - final String authuser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue()); - final String authpass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue()); - String authstrencoded = null; - if (!authuser.isEmpty()) { - String authstr = authuser + ":" + authpass; - byte[] bytestrencoded = Base64.encodeBase64(authstr.getBytes(StandardCharsets.UTF_8)); - authstrencoded = new String(bytestrencoded, StandardCharsets.UTF_8); - } - - // create the connection - final String proxyHost = context.getProperty(PROP_PROXY_HOST).getValue(); - final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).asInteger(); - if (proxyHost == null || proxyPort == null) { - conn = (HttpURLConnection) url.openConnection(); - } else { - final Proxy proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)); - conn = (HttpURLConnection) url.openConnection(proxy); - } - - if (authstrencoded != null) { - conn.setRequestProperty("Authorization", "Basic " + authstrencoded); - } - - // set the request method - String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(request).getValue()).toUpperCase(); - conn.setRequestMethod(method); - - // set timeouts - conn.setConnectTimeout(context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); - conn.setReadTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); - - // set whether to follow redirects - conn.setInstanceFollowRedirects(context.getProperty(PROP_FOLLOW_REDIRECTS).asBoolean()); - - // special handling for https - if (conn instanceof HttpsURLConnection) { - HttpsURLConnection sconn = (HttpsURLConnection) conn; - - // check if the ssl context is set - if (sslContext != null) { - sconn.setSSLSocketFactory(sslContext.getSocketFactory()); - } - - // check the trusted hostname property and override the HostnameVerifier - String trustedHostname = trimToEmpty(context.getProperty(PROP_TRUSTED_HOSTNAME).getValue()); - if (!trustedHostname.isEmpty()) { - sconn.setHostnameVerifier(new OverrideHostnameVerifier(trustedHostname, sconn.getHostnameVerifier())); - } - } - - } - - private void sendRequest() throws IOException { - // set the http request properties using flowfile attribute values - setRequestProperties(); + Request httpRequest = configureRequest(context, session, requestFlowFile, url); // log request - logRequest(); + logRequest(logger, httpRequest); - // we only stream data for POST and PUT requests - String method = conn.getRequestMethod().toUpperCase(); - if ("POST".equals(method) || "PUT".equals(method)) { - conn.setDoOutput(true); - conn.setFixedLengthStreamingMode(request.getSize()); - - // write the flowfile contents to the output stream - try (OutputStream os = new BufferedOutputStream(conn.getOutputStream())) { - session.exportTo(request, os); - } - - // emit provenance event - session.getProvenanceReporter().send(request, conn.getURL().toExternalForm()); + // emit send provenance event if successfully sent to the server + if (httpRequest.body() != null) { + session.getProvenanceReporter().send(requestFlowFile, url.toExternalForm(), true); } - } - - private void readResponse() throws IOException { + final long startNanos = System.nanoTime(); + Response responseHttp = okHttpClient.newCall(httpRequest).execute(); // output the raw response headers (DEBUG level only) - logResponse(); + logResponse(logger, url, responseHttp); // store the status code and message - statusCode = conn.getResponseCode(); - statusMessage = conn.getResponseMessage(); + int statusCode = responseHttp.code(); + String statusMessage = responseHttp.message(); - // always write the status attributes to the request flowfile - request = writeStatusAttributes(request); - - // read from the appropriate input stream - try (InputStream is = getResponseStream()) { - - // if not successful, store the response body into a flowfile attribute - if (!isSuccess()) { - String body = trimToEmpty(toString(is, utf8)); - request = session.putAttribute(request, RESPONSE_BODY, body); - } - - // if successful, store the response body as the flowfile payload - // we include additional flowfile attributes including the reponse headers - // and the status codes. - if (isSuccess()) { - // clone the flowfile to capture the response - response = session.create(request); - - // write the status attributes - response = writeStatusAttributes(response); - - // write the response headers as attributes - // this will overwrite any existing flowfile attributes - response = session.putAllAttributes(response, convertAttributesFromHeaders()); - - // transfer the message body to the payload - // can potentially be null in edge cases - if (is != null) { - response = session.importFrom(is, response); - - // emit provenance event - final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - session.getProvenanceReporter().fetch(response, conn.getURL().toExternalForm(), millis); - } - - } - - } - - } - - private void transfer() throws IOException { - // check if we should penalize the request - if (!isSuccess()) { - request = session.penalize(request); - } - - // log the status codes from the response - logger.info("Request to {} returned status code {} for {}", - new Object[] {conn.getURL().toExternalForm(), statusCode, request}); - - // transfer to the correct relationship - // 2xx -> SUCCESS - if (isSuccess()) { - // we have two flowfiles to transfer - session.transfer(request, REL_SUCCESS_REQ); - session.transfer(response, REL_SUCCESS_RESP); - - // 5xx -> RETRY - } else if (statusCode / 100 == 5) { - session.transfer(request, REL_RETRY); - - // 1xx, 3xx, 4xx -> NO RETRY - } else { - session.transfer(request, REL_NO_RETRY); - } - - } - - private void setRequestProperties() { - - // check if we should send the a Date header with the request - if (context.getProperty(PROP_DATE_HEADER).asBoolean()) { - conn.setRequestProperty("Date", getDateValue()); - } - - // iterate through the flowfile attributes, adding any attribute that - // matches the attributes-to-send pattern. if the pattern is not set - // (it's an optional property), ignore that attribute entirely - if (attributesToSend != null) { - Map attributes = request.getAttributes(); - Matcher m = attributesToSend.matcher(""); - for (Map.Entry entry : attributes.entrySet()) { - String key = trimToEmpty(entry.getKey()); - String val = trimToEmpty(entry.getValue()); - - // don't include any of the ignored attributes - if (IGNORED_ATTRIBUTES.contains(key)) { - continue; - } - - // check if our attribute key matches the pattern - // if so, include in the request as a header - m.reset(key); - if (m.matches()) { - conn.setRequestProperty(key, val); - } - } - } - } - - /** - * Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings. - */ - private Map convertAttributesFromHeaders() throws IOException { - // create a new hashmap to store the values from the connection - Map map = new HashMap<>(); - for (Map.Entry> entry : conn.getHeaderFields().entrySet()) { - String key = entry.getKey(); - if (key == null) { - continue; - } - - List values = entry.getValue(); - - // we ignore any headers with no actual values (rare) - if (values == null || values.isEmpty()) { - continue; - } - - // create a comma separated string from the values, this is stored in the map - String value = csv(values); - - // put the csv into the map - map.put(key, value); - } - - if (conn instanceof HttpsURLConnection) { - HttpsURLConnection sconn = (HttpsURLConnection) conn; - // this should seemingly not be required, but somehow the state of the jdk client is messed up - // when retrieving SSL certificate related information if connect() has not been called previously. - sconn.connect(); - map.put(REMOTE_DN, sconn.getPeerPrincipal().getName()); - } - - return map; - } - - private boolean isSuccess() throws IOException { if (statusCode == 0) { throw new IllegalStateException("Status code unknown, connection hasn't been attempted."); } - return statusCode / 100 == 2; - } - private void logRequest() { - logger.debug("\nRequest to remote service:\n\t{}\n{}", - new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getRequestProperties())}); - } + // Create a map of the status attributes that are always written to the request and reponse FlowFiles + Map statusAttributes = new HashMap<>(); + statusAttributes.put(STATUS_CODE, String.valueOf(statusCode)); + statusAttributes.put(STATUS_MESSAGE, statusMessage); + statusAttributes.put(REQUEST_URL, url.toExternalForm()); + statusAttributes.put(TRANSACTION_ID, txId.toString()); - private void logResponse() { - logger.debug("\nResponse from remote service:\n\t{}\n{}", - new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getHeaderFields())}); - } - - private String getLogString(Map> map) { - StringBuilder sb = new StringBuilder(); - for (Map.Entry> entry : map.entrySet()) { - List list = entry.getValue(); - if (list.isEmpty()) { - continue; - } - sb.append("\t"); - sb.append(entry.getKey()); - sb.append(": "); - if (list.size() == 1) { - sb.append(list.get(0)); - } else { - sb.append(list.toString()); - } - sb.append("\n"); - } - return sb.toString(); - } - - /** - * Convert a collection of string values into a overly simple comma separated string. - * - * Does not handle the case where the value contains the delimiter. i.e. if a value contains a comma, this method does nothing to try and escape or quote the value, in traditional csv style. - */ - private String csv(Collection values) { - if (values == null || values.isEmpty()) { - return ""; - } - if (values.size() == 1) { - return values.iterator().next(); + if (requestFlowFile != null) { + requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes); } - StringBuilder sb = new StringBuilder(); - for (String value : values) { - value = value.trim(); - if (value.isEmpty()) { - continue; - } - if (sb.length() > 0) { - sb.append(", "); - } - sb.append(value); - } - return sb.toString().trim(); - } - - /** - * Return the current datetime as an RFC 1123 formatted string in the GMT tz. - */ - private String getDateValue() { - return dateFormat.print(System.currentTimeMillis()); - } - - /** - * Returns a string from the input stream using the specified character encoding. - */ - private String toString(InputStream is, Charset charset) throws IOException { - if (is == null) { - return ""; + // If the property to add the response headers to the request flowfile is true then add them + if (context.getProperty(PROP_ADD_HEADERS_TO_REQUEST).asBoolean() && requestFlowFile != null) { + // write the response headers as attributes + // this will overwrite any existing flowfile attributes + requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(url, responseHttp)); } - ByteArrayOutputStream out = new ByteArrayOutputStream(); - byte[] buf = new byte[4096]; - int len; - while ((len = is.read(buf)) != -1) { - out.write(buf, 0, len); - } - return new String(out.toByteArray(), charset); - } + boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null; + boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute) || context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean(); + ResponseBody responseBody = responseHttp.body(); + boolean bodyExists = responseBody != null; - /** - * Returns the input stream to use for reading from the remote server. We're either going to want the inputstream or errorstream, effectively depending on the status code. - *

- * This method can return null if there is no inputstream to read from. For example, if the remote server did not send a message body. eg. 204 No Content or 304 Not Modified - */ - private InputStream getResponseStream() { + InputStream responseBodyStream = null; + SoftLimitBoundedByteArrayOutputStream outputStreamToRequestAttribute = null; + TeeInputStream teeInputStream = null; try { - InputStream is = conn.getErrorStream(); - if (is == null) { - is = conn.getInputStream(); + responseBodyStream = bodyExists ? responseBody.byteStream() : null; + if (responseBodyStream != null && outputBodyToRequestAttribute && outputBodyToResponseContent) { + outputStreamToRequestAttribute = new SoftLimitBoundedByteArrayOutputStream(maxAttributeSize); + teeInputStream = new TeeInputStream(responseBodyStream, outputStreamToRequestAttribute); } - return new BufferedInputStream(is); - } catch (IOException e) { - logger.warn("Response stream threw an exception: {}", new Object[] {e}, e); - return null; - } - } + if (outputBodyToResponseContent) { + /* + * If successful and putting to response flowfile, store the response body as the flowfile payload + * we include additional flowfile attributes including the response headers and the status codes. + */ - /** - * Writes the status attributes onto the flowfile, returning the flowfile that was updated. - */ - private FlowFile writeStatusAttributes(FlowFile flowfile) { - flowfile = session.putAttribute(flowfile, STATUS_CODE, String.valueOf(statusCode)); - flowfile = session.putAttribute(flowfile, STATUS_MESSAGE, statusMessage); - flowfile = session.putAttribute(flowfile, REQUEST_URL, conn.getURL().toExternalForm()); - flowfile = session.putAttribute(flowfile, TRANSACTION_ID, Long.toString(txId)); - return flowfile; - } + // clone the flowfile to capture the response + if (requestFlowFile != null) { + responseFlowFile = session.create(requestFlowFile); + } else { + responseFlowFile = session.create(); + } - /** - * - */ - private static class OverrideHostnameVerifier implements HostnameVerifier { + // write the status attributes + responseFlowFile = session.putAllAttributes(responseFlowFile, statusAttributes); - private final String trustedHostname; - private final HostnameVerifier delegate; + // write the response headers as attributes + // this will overwrite any existing flowfile attributes + responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(url, responseHttp)); - private OverrideHostnameVerifier(String trustedHostname, HostnameVerifier delegate) { - this.trustedHostname = trustedHostname; - this.delegate = delegate; + // transfer the message body to the payload + // can potentially be null in edge cases + if (bodyExists) { + if (teeInputStream != null) { + responseFlowFile = session.importFrom(teeInputStream, responseFlowFile); + } else { + responseFlowFile = session.importFrom(responseBodyStream, responseFlowFile); + } + + // emit provenance event + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().fetch(responseFlowFile, url.toExternalForm(), millis); + } + } + + // if not successful and request flowfile is not null, store the response body into a flowfile attribute + if (outputBodyToRequestAttribute && bodyExists) { + String attributeKey = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).evaluateAttributeExpressions(requestFlowFile).getValue(); + if (attributeKey == null) { + attributeKey = RESPONSE_BODY; + } + byte[] outputBuffer; + int size; + + if (outputStreamToRequestAttribute != null) { + outputBuffer = outputStreamToRequestAttribute.getBuffer(); + size = outputStreamToRequestAttribute.size(); + } else { + outputBuffer = new byte[maxAttributeSize]; + size = StreamUtils.fillBuffer(responseBodyStream, outputBuffer, false); + } + String bodyString = new String(outputBuffer, 0, size, getCharsetFromMediaType(responseBody.contentType())); + requestFlowFile = session.putAttribute(requestFlowFile, attributeKey, bodyString); + + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().modifyAttributes(requestFlowFile, "The " + attributeKey + " has been added. The value of which is the body of a http call to " + + url.toExternalForm() + ". It took " + millis + "millis,"); + } + } finally { + if(outputStreamToRequestAttribute != null){ + outputStreamToRequestAttribute.close(); + outputStreamToRequestAttribute = null; + } + if(teeInputStream != null){ + teeInputStream.close(); + teeInputStream = null; + } else if(responseBodyStream != null){ + responseBodyStream.close(); + responseBodyStream = null; + } } - @Override - public boolean verify(String hostname, SSLSession session) { - if (trustedHostname.equalsIgnoreCase(hostname)) { - return true; + route(requestFlowFile, responseFlowFile, session, context, statusCode); + } catch (final Exception e) { + // penalize or yield + if (requestFlowFile != null) { + logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, e); + requestFlowFile = session.penalize(requestFlowFile); + // transfer original to failure + session.transfer(requestFlowFile, REL_FAILURE); + } else { + logger.error("Yielding processor due to exception encountered as a source processor: {}", e); + context.yield(); + } + + + // cleanup response flowfile, if applicable + try { + if (responseFlowFile != null) { + session.remove(responseFlowFile); } - return delegate.verify(hostname, session); + } catch (final Exception e1) { + logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{e1}, e1); } } } + + private Request configureRequest(final ProcessContext context, final ProcessSession session, final FlowFile requestFlowFile, URL url) { + Request.Builder requestBuilder = new Request.Builder(); + + requestBuilder = requestBuilder.url(url); + final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue()); + + // If the username/password properties are set then check if digest auth is being used + if (!authUser.isEmpty() && "false".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) { + final String authPass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue()); + + String credential = com.squareup.okhttp.Credentials.basic(authUser, authPass); + requestBuilder = requestBuilder.header("Authorization", credential); + } + + // set the request method + String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile).getValue()).toUpperCase(); + switch (method) { + case "GET": + requestBuilder = requestBuilder.get(); + break; + case "POST": + RequestBody requestBody = getRequestBodyToSend(session, requestFlowFile); + requestBuilder = requestBuilder.post(requestBody); + break; + case "PUT": + requestBody = getRequestBodyToSend(session, requestFlowFile); + requestBuilder = requestBuilder.put(requestBody); + break; + case "HEAD": + requestBuilder = requestBuilder.head(); + break; + case "DELETE": + requestBuilder = requestBuilder.delete(); + break; + default: + requestBuilder = requestBuilder.method(method, null); + } + + requestBuilder = setHeaderProperties(context, requestBuilder, requestFlowFile); + + return requestBuilder.build(); + } + + private RequestBody getRequestBodyToSend(final ProcessSession session, final FlowFile requestFlowFile) { + return new RequestBody() { + @Override + public MediaType contentType() { + final String attributeValue = requestFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); + String contentType = attributeValue == null ? DEFAULT_CONTENT_TYPE : attributeValue; + return MediaType.parse(contentType); + } + + @Override + public void writeTo(BufferedSink sink) throws IOException { + session.exportTo(requestFlowFile, sink.outputStream()); + } + }; + } + + private Request.Builder setHeaderProperties(final ProcessContext context, Request.Builder requestBuilder, final FlowFile requestFlowFile) { + // check if we should send the a Date header with the request + if (context.getProperty(PROP_DATE_HEADER).asBoolean()) { + requestBuilder = requestBuilder.addHeader("Date", DATE_FORMAT.print(System.currentTimeMillis())); + } + + for (String headerKey : dynamicPropertyNames) { + String headerValue = context.getProperty(headerKey).evaluateAttributeExpressions(requestFlowFile).getValue(); + requestBuilder = requestBuilder.addHeader(headerKey, headerValue); + } + + // iterate through the flowfile attributes, adding any attribute that + // matches the attributes-to-send pattern. if the pattern is not set + // (it's an optional property), ignore that attribute entirely + if (regexAttributesToSend != null) { + Map attributes = requestFlowFile.getAttributes(); + Matcher m = regexAttributesToSend.matcher(""); + for (Map.Entry entry : attributes.entrySet()) { + String headerKey = trimToEmpty(entry.getKey()); + + // don't include any of the ignored attributes + if (IGNORED_ATTRIBUTES.contains(headerKey)) { + continue; + } + + // check if our attribute key matches the pattern + // if so, include in the request as a header + m.reset(headerKey); + if (m.matches()) { + String headerVal = trimToEmpty(entry.getValue()); + requestBuilder = requestBuilder.addHeader(headerKey, headerVal); + } + } + } + return requestBuilder; + } + + + private void route(FlowFile request, FlowFile response, ProcessSession session, ProcessContext context, int statusCode){ + // check if we should penalize the request + if (!isSuccess(statusCode)) { + if (request == null) { + context.yield(); + } else { + request = session.penalize(request); + } + } + + // If the property to output the response flowfile regardless of status code is set then transfer it + boolean responseSent = false; + if (context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean()) { + session.transfer(response, REL_RESPONSE); + responseSent = true; + } + + // transfer to the correct relationship + // 2xx -> SUCCESS + if (isSuccess(statusCode)) { + // we have two flowfiles to transfer + if (request != null) { + session.transfer(request, REL_SUCCESS_REQ); + } + if (response != null && !responseSent) { + session.transfer(response, REL_RESPONSE); + } + + // 5xx -> RETRY + } else if (statusCode / 100 == 5) { + if (request != null) { + session.transfer(request, REL_RETRY); + } + + // 1xx, 3xx, 4xx -> NO RETRY + } else { + if (request != null) { + session.transfer(request, REL_NO_RETRY); + } + } + + } + + private boolean isSuccess(int statusCode) { + return statusCode / 100 == 2; + } + + private void logRequest(ProcessorLog logger, Request request) { + logger.debug("\nRequest to remote service:\n\t{}\n{}", + new Object[]{request.url().toExternalForm(), getLogString(request.headers().toMultimap())}); + } + + private void logResponse(ProcessorLog logger, URL url, Response response) { + logger.debug("\nResponse from remote service:\n\t{}\n{}", + new Object[]{url.toExternalForm(), getLogString(response.headers().toMultimap())}); + } + + private String getLogString(Map> map) { + StringBuilder sb = new StringBuilder(); + for (Map.Entry> entry : map.entrySet()) { + List list = entry.getValue(); + if (list.isEmpty()) { + continue; + } + sb.append("\t"); + sb.append(entry.getKey()); + sb.append(": "); + if (list.size() == 1) { + sb.append(list.get(0)); + } else { + sb.append(list.toString()); + } + sb.append("\n"); + } + return sb.toString(); + } + + /** + * Convert a collection of string values into a overly simple comma separated string. + *

+ * Does not handle the case where the value contains the delimiter. i.e. if a value contains a comma, this method does nothing to try and escape or quote the value, in traditional csv style. + */ + private String csv(Collection values) { + if (values == null || values.isEmpty()) { + return ""; + } + if (values.size() == 1) { + return values.iterator().next(); + } + + StringBuilder sb = new StringBuilder(); + for (String value : values) { + value = value.trim(); + if (value.isEmpty()) { + continue; + } + if (sb.length() > 0) { + sb.append(", "); + } + sb.append(value); + } + return sb.toString().trim(); + } + + /** + * Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings. + */ + private Map convertAttributesFromHeaders(URL url, Response responseHttp){ + // create a new hashmap to store the values from the connection + Map map = new HashMap<>(); + for (Map.Entry> entry : responseHttp.headers().toMultimap().entrySet()) { + String key = entry.getKey(); + if (key == null) { + continue; + } + + List values = entry.getValue(); + + // we ignore any headers with no actual values (rare) + if (values == null || values.isEmpty()) { + continue; + } + + // create a comma separated string from the values, this is stored in the map + String value = csv(values); + + // put the csv into the map + map.put(key, value); + } + + if ("HTTPS".equals(url.getProtocol().toUpperCase())) { + map.put(REMOTE_DN, responseHttp.handshake().peerPrincipal().getName()); + } + + return map; + } + + private Charset getCharsetFromMediaType(MediaType contentType) { + return contentType != null ? contentType.charset(StandardCharsets.UTF_8) : StandardCharsets.UTF_8; + } + + private static class OverrideHostnameVerifier implements HostnameVerifier { + + private final String trustedHostname; + private final HostnameVerifier delegate; + + private OverrideHostnameVerifier(String trustedHostname, HostnameVerifier delegate) { + this.trustedHostname = trustedHostname; + this.delegate = delegate; + } + + @Override + public boolean verify(String hostname, SSLSession session) { + if (trustedHostname.equalsIgnoreCase(hostname)) { + return true; + } + return delegate.verify(hostname, session); + } + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java index a26b2eddac..a82bc5a1f6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java @@ -19,22 +19,27 @@ package org.apache.nifi.processors.standard; import java.io.IOException; import java.io.PrintWriter; import java.net.URL; +import java.util.HashMap; +import java.util.Map; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.nifi.processors.standard.util.TestInvokeHttpCommon; +import org.apache.nifi.ssl.StandardSSLContextService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunners; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; + public class TestInvokeHTTP extends TestInvokeHttpCommon { @BeforeClass @@ -72,41 +77,99 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon { return new TestServer(); } + @Test + public void testSslSetHttpRequest() throws Exception { + + final Map sslProperties = new HashMap<>(); + sslProperties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + sslProperties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + sslProperties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); + sslProperties.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); + sslProperties.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); + sslProperties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); + + runner = TestRunners.newTestRunner(InvokeHTTP.class); + final StandardSSLContextService sslService = new StandardSSLContextService(); + runner.addControllerService("ssl-context", sslService, sslProperties); + runner.enableControllerService(sslService); + runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); + + addHandler(new GetOrHeadHandler()); + + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + + createFlowFiles(runner); + + runner.run(); + + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + + // expected in request status.code and status.message + // original flow file (+attributes) + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); + bundle.assertContentEquals("Hello".getBytes("UTF-8")); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + bundle.assertAttributeEquals("Foo", "Bar"); + + // expected in response + // status code, status message, all headers from server response --> ff attributes + // server response message body into payload of ff + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); + bundle1.assertContentEquals("/status/200".getBytes("UTF-8")); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals("Foo", "Bar"); + bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); + bundle1.assertAttributeEquals("OkHttp-Selected-Protocol", "http/1.1"); + } + // Currently InvokeHttp does not support Proxy via Https @Test public void testProxy() throws Exception { addHandler(new MyProxyHandler()); URL proxyURL = new URL(url); - runner.setProperty(InvokeHTTP.Config.PROP_URL, "http://nifi.apache.org/"); // just a dummy URL no connection goes out - runner.setProperty(InvokeHTTP.Config.PROP_PROXY_HOST, proxyURL.getHost()); - runner.setProperty(InvokeHTTP.Config.PROP_PROXY_PORT, String.valueOf(proxyURL.getPort())); + runner.setProperty(InvokeHTTP.PROP_URL, "http://nifi.apache.org/"); // just a dummy URL no connection goes out + runner.setProperty(InvokeHTTP.PROP_PROXY_HOST, proxyURL.getHost()); + + try{ + runner.run(); + Assert.fail(); + } catch (AssertionError e){ + // Expect assetion error when proxy port isn't set but host is. + } + runner.setProperty(InvokeHTTP.PROP_PROXY_PORT, String.valueOf(proxyURL.getPort())); createFlowFiles(runner); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); //expected in request status.code and status.message //original flow file (+attributes) - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); bundle.assertContentEquals("Hello".getBytes("UTF-8")); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle.assertAttributeEquals("Foo", "Bar"); //expected in response //status code, status message, all headers from server response --> ff attributes //server response message body into payload of ff - final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0); + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); bundle1.assertContentEquals("http://nifi.apache.org/".getBytes("UTF-8")); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle1.assertAttributeEquals("Foo", "Bar"); bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java index d155b74d48..b3cd9dc7a2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.standard; -import org.apache.nifi.processors.standard.InvokeHTTP.Config; import org.apache.nifi.processors.standard.util.TestInvokeHttpCommon; import org.apache.nifi.ssl.StandardSSLContextService; import org.apache.nifi.util.TestRunners; @@ -63,7 +62,7 @@ public class TestInvokeHttpSSL extends TestInvokeHttpCommon { final StandardSSLContextService sslService = new StandardSSLContextService(); runner.addControllerService("ssl-context", sslService, sslProperties); runner.enableControllerService(sslService); - runner.setProperty(Config.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); + runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); server.clearHandlers(); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java index 88dfcdb056..b44f01522c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java @@ -24,9 +24,16 @@ import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; +import org.eclipse.jetty.security.ConstraintSecurityHandler; +import org.eclipse.jetty.security.DefaultIdentityService; +import org.eclipse.jetty.security.HashLoginService; +import org.eclipse.jetty.security.ServerAuthException; +import org.eclipse.jetty.security.authentication.DigestAuthenticator; +import org.eclipse.jetty.server.Authentication; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.security.Password; import org.junit.Assert; import org.junit.Test; @@ -67,7 +74,7 @@ public abstract class TestInvokeHttpCommon { final DateHandler dh = new DateHandler(); addHandler(dh); - runner.setProperty(InvokeHTTP.Config.PROP_URL, url); + runner.setProperty(InvokeHTTP.PROP_URL, url); createFlowFiles(runner); runner.run(); @@ -94,35 +101,356 @@ public abstract class TestInvokeHttpCommon { public void test200() throws Exception { addHandler(new GetOrHeadHandler()); - runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); createFlowFiles(runner); + // Verify only one FlowFile gets created/sent + runner.run(); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); // expected in request status.code and status.message // original flow file (+attributes) - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); bundle.assertContentEquals("Hello".getBytes("UTF-8")); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle.assertAttributeEquals("Foo", "Bar"); // expected in response // status code, status message, all headers from server response --> ff attributes // server response message body into payload of ff - final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0); + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); bundle1.assertContentEquals("/status/200".getBytes("UTF-8")); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle1.assertAttributeEquals("Foo", "Bar"); bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); + + final List provEvents = runner.getProvenanceEvents(); + assertEquals(2, provEvents.size()); + boolean forkEvent = false; + boolean fetchEvent = false; + for (final ProvenanceEventRecord event : provEvents) { + if (event.getEventType() == ProvenanceEventType.FORK) { + forkEvent = true; + } else if (event.getEventType() == ProvenanceEventType.FETCH) { + fetchEvent = true; + } + } + + assertTrue(forkEvent); + assertTrue(fetchEvent); + } + + @Test + public void testOutputResponseRegardless() throws Exception { + addHandler(new GetOrHeadHandler()); + + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_METHOD, "POST"); + runner.setProperty(InvokeHTTP.PROP_OUTPUT_RESPONSE_REGARDLESS,"true"); + + createFlowFiles(runner); + + runner.run(); + + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + + // expected in request status.code and status.message + // original flow file (+attributes) + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0); + bundle.assertContentEquals("Hello".getBytes("UTF-8")); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found"); + bundle.assertAttributeEquals("Foo", "Bar"); + + // expected in response + // status code, status message, all headers from server response --> ff attributes + // server response message body into payload of ff + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); + bundle1.assertContentEquals("NO".getBytes("UTF-8")); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found"); + bundle1.assertAttributeEquals("Foo", "Bar"); + bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); + } + + @Test + public void testOutputResponseRegardlessWithOutputInAttribute() throws Exception { + addHandler(new GetOrHeadHandler()); + + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_METHOD, "POST"); + runner.setProperty(InvokeHTTP.PROP_OUTPUT_RESPONSE_REGARDLESS,"true"); + runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE,"outputBody"); + + createFlowFiles(runner); + + runner.run(); + + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + + // expected in request status.code and status.message + // original flow file (+attributes) + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0); + bundle.assertContentEquals("Hello".getBytes("UTF-8")); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found"); + bundle.assertAttributeEquals("outputBody", "NO"); + bundle.assertAttributeEquals("Foo", "Bar"); + + // expected in response + // status code, status message, all headers from server response --> ff attributes + // server response message body into payload of ff + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); + bundle1.assertContentEquals("NO".getBytes("UTF-8")); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found"); + bundle1.assertAttributeEquals("Foo", "Bar"); + bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); + } + + @Test + public void testOutputResponseRegardlessWithOutputInAttributeLarge() throws Exception { + addHandler(new GetLargeHandler()); + + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_METHOD, "POST"); + runner.setProperty(InvokeHTTP.PROP_OUTPUT_RESPONSE_REGARDLESS,"true"); + runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE,"outputBody"); + runner.setProperty(InvokeHTTP.PROP_PUT_ATTRIBUTE_MAX_LENGTH,"11"); + + createFlowFiles(runner); + + runner.run(); + + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + + // expected in request status.code and status.message + // original flow file (+attributes) + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0); + bundle.assertContentEquals("Hello".getBytes("UTF-8")); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found"); + bundle.assertAttributeEquals("outputBody", "Lorem ipsum"); + bundle.assertAttributeEquals("Foo", "Bar"); + + // expected in response + // status code, status message, all headers from server response --> ff attributes + // server response message body into payload of ff + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); + bundle1.assertContentEquals("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. " + + "Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor " + + "in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, " + + "sunt in culpa qui officia deserunt mollit anim id est laborum."); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found"); + bundle1.assertAttributeEquals("Foo", "Bar"); + bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); + } + + + @Test + public void testMultipleSameHeaders() throws Exception { + addHandler(new GetMultipleHeaderHandler()); + + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + + createFlowFiles(runner); + + runner.run(); + + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + + // expected in request status.code and status.message + // original flow file (+attributes) + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); + bundle.assertContentEquals("Hello".getBytes("UTF-8")); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + bundle.assertAttributeEquals("Foo", "Bar"); + + // expected in response + // status code, status message, all headers from server response --> ff attributes + // server response message body into payload of ff + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); + bundle1.assertContentEquals("/status/200".getBytes("UTF-8")); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals("Foo", "Bar"); + bundle1.assertAttributeEquals("double", "1, 2"); + bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); + } + + @Test + public void testPutResponseHeadersInRequest() throws Exception { + addHandler(new GetMultipleHeaderHandler()); + + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_ADD_HEADERS_TO_REQUEST, "true"); + + createFlowFiles(runner); + + runner.run(); + + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + + // expected in request status.code and status.message + // original flow file (+all attributes from response) + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); + bundle.assertContentEquals("Hello".getBytes("UTF-8")); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + bundle.assertAttributeEquals("Foo", "Bar"); + bundle.assertAttributeEquals("double", "1, 2"); + bundle.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); + + // expected in response + // status code, status message, all headers from server response --> ff attributes + // server response message body into payload of ff + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); + bundle1.assertContentEquals("/status/200".getBytes("UTF-8")); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals("Foo", "Bar"); + bundle1.assertAttributeEquals("double", "1, 2"); + bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); + } + + @Test + public void testToRequestAttribute() throws Exception { + addHandler(new GetOrHeadHandler()); + + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE,"outputBody"); + + createFlowFiles(runner); + + runner.run(); + + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + + // expected in request status.code, status.message and body of response in attribute + // original flow file (+attributes) + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); + bundle.assertContentEquals("Hello".getBytes("UTF-8")); + bundle.assertAttributeEquals("outputBody", "/status/200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + bundle.assertAttributeEquals("Foo", "Bar"); + } + + @Test + public void testNoInput() throws Exception { + addHandler(new GetOrHeadHandler()); + + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_METHOD,"GET"); + runner.setIncomingConnection(false); + runner.setNonLoopConnection(false); + + runner.run(); + + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + + // expected in response + // status code, status message, all headers from server response --> ff attributes + // server response message body into payload of ff + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); + bundle1.assertContentEquals("/status/200".getBytes("UTF-8")); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); + } + + @Test + public void testNoInputFail() throws Exception { + addHandler(new GetOrHeadHandler()); + + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_METHOD, "POST"); + runner.setIncomingConnection(false); + runner.setNonLoopConnection(false); + + runner.run(); + + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + + runner.setProperty(InvokeHTTP.PROP_METHOD,"OPTION"); + + runner.run(); + + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + } + + @Test + public void testNoInputSendToAttribute() throws Exception { + addHandler(new GetOrHeadHandler()); + + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE, "outputBody"); + runner.setIncomingConnection(false); + runner.setNonLoopConnection(false); + + runner.run(); + + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + + // expected in request + // status code, status message, no ff content + // server response message body into attribute of ff + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); + bundle1.assertContentEquals("".getBytes("UTF-8")); + bundle1.assertAttributeEquals("outputBody", "/status/200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); } @Test @@ -132,9 +460,9 @@ public abstract class TestInvokeHttpCommon { final String username = "basic_user"; final String password = "basic_password"; - runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/200"); - runner.setProperty(InvokeHTTP.Config.PROP_BASIC_AUTH_USERNAME, username); - runner.setProperty(InvokeHTTP.Config.PROP_BASIC_AUTH_PASSWORD, password); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_USERNAME, username); + runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_PASSWORD, password); final byte[] creds = String.format("%s:%s", username, password).getBytes(StandardCharsets.UTF_8); final String expAuth = String.format("Basic %s", new String(encodeBase64(creds))); @@ -142,28 +470,28 @@ public abstract class TestInvokeHttpCommon { runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); // expected in request status.code and status.message // original flow file (+attributes) - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); bundle.assertContentEquals("Hello".getBytes("UTF-8")); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle.assertAttributeEquals("Foo", "Bar"); // expected in response // status code, status message, all headers from server response --> ff attributes // server response message body into payload of ff - final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0); + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); final String bundle1Content = new String(bundle1.toByteArray(), StandardCharsets.UTF_8); assertTrue(bundle1Content.startsWith(expAuth)); // use startsWith instead of equals so we can ignore line endings - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle1.assertAttributeEquals("Foo", "Bar"); bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); @@ -190,55 +518,140 @@ public abstract class TestInvokeHttpCommon { final String username = "basic_user"; final String password = "basic_password"; - runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/401"); - runner.setProperty(InvokeHTTP.Config.PROP_BASIC_AUTH_USERNAME, username); - runner.setProperty(InvokeHTTP.Config.PROP_BASIC_AUTH_PASSWORD, password); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/401"); + runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_USERNAME, username); + runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_PASSWORD, password); createFlowFiles(runner); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); // expected in request status.code and status.message // original flow file (+attributes) - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_NO_RETRY).get(0); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "401"); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "Unauthorized"); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "401"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Unauthorized"); bundle.assertAttributeEquals("Foo", "Bar"); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); final String expected = "Hello"; Assert.assertEquals(expected, actual); - final String response = bundle.getAttribute(InvokeHTTP.Config.RESPONSE_BODY); - assertEquals(response, "Get off my lawn!"); + final String response = bundle.getAttribute(InvokeHTTP.RESPONSE_BODY); + assertEquals(response, "Get off my lawn!"+System.lineSeparator()); + } + + @Test + public void test200DigestAuth() throws Exception { + addHandler(new DigestAuthHandler()); + final String username = "basic_user"; + final String password = "basic_password"; + + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_USERNAME, username); + runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_PASSWORD, password); + runner.setProperty(InvokeHTTP.PROP_DIGEST_AUTH,"true"); + + createFlowFiles(runner); + + runner.run(); + + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + + //expected in request status.code and status.message + //original flow file (+attributes) + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + bundle.assertAttributeEquals("Foo", "Bar"); + final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); + final String expected = "Hello"; + Assert.assertEquals(expected, actual); + + //expected in response + //status code, status message, all headers from server response --> ff attributes + //server response message body into payload of ff + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); + bundle1.assertContentEquals(("DIGEST"+System.lineSeparator()).getBytes("UTF-8")); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals("Foo", "Bar"); + bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); + } + + @Test + public void test401DigestNotAuth() throws Exception { + addHandler(new DigestAuthHandler()); + + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_DIGEST_AUTH,"false"); + runner.setProperty(InvokeHTTP.PROP_PUT_ATTRIBUTE_MAX_LENGTH,"512"); + + createFlowFiles(runner); + + runner.run(); + + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + + //expected in request status.code and status.message + //original flow file (+attributes) + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "401"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Unauthorized"); + bundle.assertAttributeEquals("Foo", "Bar"); + final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); + final String expected = "Hello"; + Assert.assertEquals(expected, actual); + + final String response = bundle.getAttribute(InvokeHTTP.RESPONSE_BODY); + assertEquals("\n" + + "\n" + + "\n" + + "Error 401 \n" + + "\n" + + "\n" + + "

HTTP ERROR: 401

\n" + + "

Problem accessing /status/200. Reason:\n" + + "

    Unauthorized

\n" + + "
Powered by Jetty://\n" + + "\n" + + "\n", response); } @Test public void test500() throws Exception { addHandler(new GetOrHeadHandler()); - runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/500"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/500"); createFlowFiles(runner); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 1); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); // expected in response - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_RETRY).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RETRY).get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "500"); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "Server Error"); - bundle.assertAttributeEquals(InvokeHTTP.Config.RESPONSE_BODY, "/status/500"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "500"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Server Error"); + bundle.assertAttributeEquals(InvokeHTTP.RESPONSE_BODY, "/status/500"); final String expected = "Hello"; Assert.assertEquals(expected, actual); @@ -250,23 +663,23 @@ public abstract class TestInvokeHttpCommon { public void test300() throws Exception { addHandler(new GetOrHeadHandler()); - runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/302"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/302"); createFlowFiles(runner); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); // getMyFlowFiles(); // expected in response - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_NO_RETRY).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "302"); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "Found"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "302"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Found"); final String expected = "Hello"; Assert.assertEquals(expected, actual); bundle.assertAttributeEquals("Foo", "Bar"); @@ -277,23 +690,23 @@ public abstract class TestInvokeHttpCommon { public void test304() throws Exception { addHandler(new GetOrHeadHandler()); - runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/304"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/304"); createFlowFiles(runner); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); // getMyFlowFiles(); // expected in response - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_NO_RETRY).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "304"); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "Not Modified"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "304"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Modified"); final String expected = "Hello"; Assert.assertEquals(expected, actual); bundle.assertAttributeEquals("Foo", "Bar"); @@ -304,24 +717,24 @@ public abstract class TestInvokeHttpCommon { public void test400() throws Exception { addHandler(new GetOrHeadHandler()); - runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/400"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/400"); createFlowFiles(runner); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); // getMyFlowFiles(); // expected in response - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_NO_RETRY).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "400"); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "Bad Request"); - bundle.assertAttributeEquals(InvokeHTTP.Config.RESPONSE_BODY, "/status/400"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "400"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Bad Request"); + bundle.assertAttributeEquals(InvokeHTTP.RESPONSE_BODY, "/status/400"); final String expected = "Hello"; Assert.assertEquals(expected, actual); bundle.assertAttributeEquals("Foo", "Bar"); @@ -332,25 +745,25 @@ public abstract class TestInvokeHttpCommon { public void test412() throws Exception { addHandler(new GetOrHeadHandler()); - runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/412"); - runner.setProperty(InvokeHTTP.Config.PROP_METHOD, "GET"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/412"); + runner.setProperty(InvokeHTTP.PROP_METHOD, "GET"); createFlowFiles(runner); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); // expected in response - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_NO_RETRY).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "412"); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "Precondition Failed"); - bundle.assertAttributeEquals(InvokeHTTP.Config.RESPONSE_BODY, "/status/412"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "412"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Precondition Failed"); + bundle.assertAttributeEquals(InvokeHTTP.RESPONSE_BODY, "/status/412"); final String expected = "Hello"; Assert.assertEquals(expected, actual); bundle.assertAttributeEquals("Foo", "Bar"); @@ -361,28 +774,28 @@ public abstract class TestInvokeHttpCommon { public void testHead() throws Exception { addHandler(new GetOrHeadHandler()); - runner.setProperty(InvokeHTTP.Config.PROP_METHOD, "HEAD"); - runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_METHOD, "HEAD"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); createFlowFiles(runner); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); bundle.assertContentEquals("Hello".getBytes("UTF-8")); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle.assertAttributeEquals("Foo", "Bar"); - final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0); + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); bundle1.assertContentEquals("".getBytes("UTF-8")); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle1.assertAttributeEquals("Foo", "Bar"); bundle1.assertAttributeEquals("Content-Type", "text/plain"); final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8); @@ -394,28 +807,28 @@ public abstract class TestInvokeHttpCommon { public void testPost() throws Exception { addHandler(new PostHandler()); - runner.setProperty(InvokeHTTP.Config.PROP_METHOD, "POST"); - runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/post"); + runner.setProperty(InvokeHTTP.PROP_METHOD, "POST"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/post"); createFlowFiles(runner); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); bundle.assertContentEquals("Hello".getBytes("UTF-8")); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle.assertAttributeEquals("Foo", "Bar"); - final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0); + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); bundle1.assertContentEquals("".getBytes("UTF-8")); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle1.assertAttributeEquals("Foo", "Bar"); bundle1.assertAttributeNotExists("Content-Type"); @@ -426,30 +839,30 @@ public abstract class TestInvokeHttpCommon { @Test public void testPut() throws Exception { - addHandler(new PostHandler()); + addHandler(new PutHandler()); - runner.setProperty(InvokeHTTP.Config.PROP_METHOD, "PUT"); - runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/post"); + runner.setProperty(InvokeHTTP.PROP_METHOD, "PUT"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/post"); createFlowFiles(runner); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); bundle.assertContentEquals("Hello".getBytes("UTF-8")); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle.assertAttributeEquals("Foo", "Bar"); - final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0); + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); bundle1.assertContentEquals("".getBytes("UTF-8")); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle1.assertAttributeEquals("Foo", "Bar"); bundle1.assertAttributeNotExists("Content-Type"); @@ -462,28 +875,28 @@ public abstract class TestInvokeHttpCommon { public void testDelete() throws Exception { addHandler(new DeleteHandler()); - runner.setProperty(InvokeHTTP.Config.PROP_METHOD, "DELETE"); - runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_METHOD, "DELETE"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); createFlowFiles(runner); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); bundle.assertContentEquals("Hello".getBytes("UTF-8")); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle.assertAttributeEquals("Foo", "Bar"); - final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0); + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); bundle1.assertContentEquals("".getBytes("UTF-8")); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle1.assertAttributeEquals("Foo", "Bar"); final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8); final String expected1 = ""; @@ -494,28 +907,28 @@ public abstract class TestInvokeHttpCommon { public void testOptions() throws Exception { addHandler(new OptionsHandler()); - runner.setProperty(InvokeHTTP.Config.PROP_METHOD, "OPTIONS"); - runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_METHOD, "OPTIONS"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); createFlowFiles(runner); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); bundle.assertContentEquals("Hello".getBytes("UTF-8")); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle.assertAttributeEquals("Foo", "Bar"); - final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0); + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); bundle1.assertContentEquals("/status/200".getBytes("UTF-8")); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle1.assertAttributeEquals("Foo", "Bar"); } @@ -523,34 +936,36 @@ public abstract class TestInvokeHttpCommon { public void testSendAttributes() throws Exception { addHandler(new AttributesSentHandler()); - runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/200"); - runner.setProperty(InvokeHTTP.Config.PROP_ATTRIBUTES_TO_SEND, "Foo"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, "F.*"); + runner.setProperty("dynamicHeader","yes!"); createFlowFiles(runner); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); //expected in request status.code and status.message //original flow file (+attributes) - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); bundle.assertContentEquals("Hello".getBytes("UTF-8")); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); bundle.assertAttributeEquals("Foo", "Bar"); //expected in response //status code, status message, all headers from server response --> ff attributes //server response message body into payload of ff - final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0); + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); bundle1.assertContentEquals("Bar".getBytes("UTF-8")); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200"); - bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals("dynamicHeader","yes!"); bundle1.assertAttributeEquals("Foo", "Bar"); bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); } @@ -559,19 +974,19 @@ public abstract class TestInvokeHttpCommon { public void testReadTimeout() throws Exception { addHandler(new ReadTimeoutHandler()); - runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/200"); - runner.setProperty(InvokeHTTP.Config.PROP_READ_TIMEOUT, "5 secs"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_READ_TIMEOUT, "5 secs"); createFlowFiles(runner); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 1); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_FAILURE).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); final String expected = "Hello"; @@ -586,17 +1001,17 @@ public abstract class TestInvokeHttpCommon { // this is the bad urls final String badurlport = "http://localhost:" + 445; - runner.setProperty(InvokeHTTP.Config.PROP_URL, badurlport + "/doesnotExist"); + runner.setProperty(InvokeHTTP.PROP_URL, badurlport + "/doesnotExist"); createFlowFiles(runner); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 1); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_FAILURE).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); final String expected = "Hello"; @@ -610,17 +1025,17 @@ public abstract class TestInvokeHttpCommon { final String badurlhost = "http://localhOOst:" + 445; - runner.setProperty(InvokeHTTP.Config.PROP_URL, badurlhost + "/doesnotExist"); + runner.setProperty(InvokeHTTP.PROP_URL, badurlhost + "/doesnotExist"); createFlowFiles(runner); runner.run(); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0); - runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 1); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1); - final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_FAILURE).get(0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0); final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); final String expected = "Hello"; @@ -628,6 +1043,46 @@ public abstract class TestInvokeHttpCommon { bundle.assertAttributeEquals("Foo", "Bar"); } + @Test + public void testArbitraryRequest() throws Exception { + addHandler(new FetchHandler()); + + runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200"); + runner.setProperty(InvokeHTTP.PROP_METHOD,"FETCH"); + + createFlowFiles(runner); + + runner.run(); + + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0); + runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0); + + //expected in request status.code and status.message + //original flow file (+attributes) + final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8); + final String expected = "Hello"; + Assert.assertEquals(expected, actual); + bundle.assertAttributeEquals("Foo", "Bar"); + + //expected in response + //status code, status message, all headers from server response --> ff attributes + //server response message body into payload of ff + final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0); + bundle1.assertContentEquals("/status/200".getBytes("UTF-8")); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200"); + bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK"); + bundle1.assertAttributeEquals("Foo", "Bar"); + bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1"); + final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8); + final String expected1 = "/status/200"; + Assert.assertEquals(expected1, actual1); + } public static void createFlowFiles(final TestRunner testRunner) throws UnsupportedEncodingException { final Map attributes = new HashMap<>(); @@ -638,7 +1093,7 @@ public abstract class TestInvokeHttpCommon { } - private static class DateHandler extends AbstractHandler { + protected static class DateHandler extends AbstractHandler { private String dateString; @@ -654,7 +1109,7 @@ public abstract class TestInvokeHttpCommon { } } - private static class PostHandler extends AbstractHandler { + public static class PostHandler extends AbstractHandler { @Override public void handle(String target, Request baseRequest, @@ -663,15 +1118,42 @@ public abstract class TestInvokeHttpCommon { baseRequest.setHandled(true); - assertEquals("/post", target); - - final String body = request.getReader().readLine(); - assertEquals("Hello", body); + if("POST".equals(request.getMethod())) { + assertEquals("application/plain-text",request.getHeader("Content-Type")); + final String body = request.getReader().readLine(); + assertEquals("Hello", body); + } else { + response.setStatus(404); + response.setContentType("text/plain"); + response.setContentLength(0); + } } } - private static class GetOrHeadHandler extends AbstractHandler { + public static class PutHandler extends AbstractHandler { + + @Override + public void handle(String target, Request baseRequest, + HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + + baseRequest.setHandled(true); + + if("PUT".equalsIgnoreCase(request.getMethod())) { + assertEquals("application/plain-text",request.getHeader("Content-Type")); + final String body = request.getReader().readLine(); + assertEquals("Hello", body); + } else { + response.setStatus(404); + response.setContentType("text/plain"); + response.setContentLength(0); + } + + } + } + + public static class GetOrHeadHandler extends AbstractHandler { @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { @@ -688,12 +1170,91 @@ public abstract class TestInvokeHttpCommon { writer.print(target); writer.flush(); } + } else if(!"HEAD".equalsIgnoreCase(request.getMethod())) { + response.setStatus(404); + response.setContentType("text/plain"); + String body = "NO"; + response.setContentLength(body.length()); + response.setContentType("text/plain"); + + try (PrintWriter writer = response.getWriter()) { + writer.print(body); + writer.flush(); + } } } } - private static class DeleteHandler extends AbstractHandler { + public static class GetLargeHandler extends AbstractHandler { + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { + baseRequest.setHandled(true); + + final int status = Integer.valueOf(target.substring("/status".length() + 1)); + response.setStatus(status); + + response.setContentType("text/plain"); + response.setContentLength(target.length()); + + if ("GET".equalsIgnoreCase(request.getMethod())) { + try (PrintWriter writer = response.getWriter()) { + writer.print(target); + writer.flush(); + } + } else { + response.setStatus(404); + response.setContentType("text/plain"); + + //Lorem Ipsum + String body = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. " + + "Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor " + + "in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, " + + "sunt in culpa qui officia deserunt mollit anim id est laborum."; + + response.setContentLength(body.length()); + response.setContentType("text/plain"); + + try (PrintWriter writer = response.getWriter()) { + writer.print(body); + writer.flush(); + } + } + + } + } + + public static class GetMultipleHeaderHandler extends AbstractHandler { + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { + baseRequest.setHandled(true); + + final int status = Integer.valueOf(target.substring("/status".length() + 1)); + response.setStatus(status); + + response.setContentType("text/plain"); + response.setContentLength(target.length()); + + if ("GET".equalsIgnoreCase(request.getMethod())) { + response.addHeader("double", "1"); + response.addHeader("double", "2"); + + try (PrintWriter writer = response.getWriter()) { + writer.print(target); + writer.flush(); + } + } else { + response.setStatus(404); + response.setContentType("text/plain"); + response.setContentLength(0); + } + + } + } + + public static class DeleteHandler extends AbstractHandler { @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { @@ -711,7 +1272,7 @@ public abstract class TestInvokeHttpCommon { } } - private static class OptionsHandler extends AbstractHandler { + public static class OptionsHandler extends AbstractHandler { @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { @@ -735,7 +1296,7 @@ public abstract class TestInvokeHttpCommon { } } - private static class AttributesSentHandler extends AbstractHandler { + public static class AttributesSentHandler extends AbstractHandler { @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { @@ -743,12 +1304,12 @@ public abstract class TestInvokeHttpCommon { if ("Get".equalsIgnoreCase(request.getMethod())) { String headerValue = request.getHeader("Foo"); + response.setHeader("dynamicHeader",request.getHeader("dynamicHeader")); final int status = Integer.valueOf(target.substring("/status".length() + 1)); response.setStatus(status); response.setContentLength(headerValue.length()); response.setContentType("text/plain"); - try (PrintWriter writer = response.getWriter()) { writer.print(headerValue); writer.flush(); @@ -761,7 +1322,7 @@ public abstract class TestInvokeHttpCommon { } } - private static class ReadTimeoutHandler extends AbstractHandler { + public static class ReadTimeoutHandler extends AbstractHandler { @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { @@ -792,9 +1353,7 @@ public abstract class TestInvokeHttpCommon { } } - - - private static class BasicAuthHandler extends AbstractHandler { + public static class BasicAuthHandler extends AbstractHandler { private String authString; @@ -827,4 +1386,66 @@ public abstract class TestInvokeHttpCommon { } } + public static class DigestAuthHandler extends AbstractHandler { + + private DigestAuthenticator digestAuthenticator; + + private DigestAuthHandler() { + digestAuthenticator = new DigestAuthenticator(); + ConstraintSecurityHandler securityHandler = new ConstraintSecurityHandler(); + + HashLoginService hashLoginService = new HashLoginService("realm"); + hashLoginService.putUser("basic_user", new Password("basic_password"), new String[]{"realm"}); + securityHandler.setLoginService(hashLoginService); + securityHandler.setIdentityService(new DefaultIdentityService()); + digestAuthenticator.setConfiguration(securityHandler); + } + + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse + response)throws IOException, ServletException { + baseRequest.setHandled(true); + + try { + Authentication authentication = digestAuthenticator.validateRequest(request, response, true); + + if (authentication instanceof Authentication.User) { + response.setContentType("text/plain"); + Authentication.User user = (Authentication.User) authentication; + response.getWriter().println(user.getAuthMethod()); + } else if (authentication instanceof Authentication.ResponseSent) { + Authentication.ResponseSent responseSent = (Authentication.ResponseSent) authentication; + } + } catch (ServerAuthException e) { + e.printStackTrace(); + } + } + } + + public static class FetchHandler extends AbstractHandler { + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { + baseRequest.setHandled(true); + + + if ("Fetch".equalsIgnoreCase(request.getMethod())) { + final int status = Integer.valueOf(target.substring("/status".length() + 1)); + response.setStatus(status); + response.setContentType("text/plain"); + response.setContentLength(target.length()); + + try (PrintWriter writer = response.getWriter()) { + writer.print(target); + writer.flush(); + } + } else { + + response.setStatus(404); + response.setContentType("text/plain"); + response.setContentLength(target.length()); + } + } + } } diff --git a/pom.xml b/pom.xml index 8a8cdb09a7..070881223e 100644 --- a/pom.xml +++ b/pom.xml @@ -97,6 +97,44 @@ 12.0.1 2.2.0 + + + + + central + + Maven Repository + https://repo1.maven.org/maven2 + + true + + + false + + + + apache-repo + Apache Repository + https://repository.apache.org/content/repositories/releases + + true + + + false + + + + jcenter + http://jcenter.bintray.com + + false + + + true + + + +