dynamicPropertyNames = new HashSet<>();
/**
- * A single invocation of an HTTP request/response from the InvokeHTTP processor. This class encapsulates the entirety of the flowfile processing.
- *
- * This class is not thread safe and is created new for every flowfile processed.
+ * Pattern used to compute RFC 2616 Dates (#sec3.3.1). This format is used by the HTTP Date header and is optionally sent by the processor. This date is effectively an RFC 822/1123 date
+ * string, but HTTP requires it to be in GMT (preferring the literal 'GMT' string).
*/
- private static class Transaction implements Config {
+ private static final String RFC_1123 = "EEE, dd MMM yyyy HH:mm:ss 'GMT'";
+ private static final DateTimeFormatter DATE_FORMAT = DateTimeFormat.forPattern(RFC_1123).withLocale(Locale.US).withZoneUTC();
- /**
- * Pattern used to compute RFC 2616 Dates (#sec3.3.1). This format is used by the HTTP Date header and is optionally sent by the processor. This date is effectively an RFC 822/1123 date
- * string, but HTTP requires it to be in GMT (preferring the literal 'GMT' string).
- */
- private static final String rfc1123 = "EEE, dd MMM yyyy HH:mm:ss 'GMT'";
- private static final DateTimeFormatter dateFormat = DateTimeFormat.forPattern(rfc1123).withLocale(Locale.US).withZoneUTC();
+ private final AtomicReference okHttpClientAtomicReference = new AtomicReference<>();
- /**
- * Every request/response cycle from this client has a unique transaction id which will be stored as a flowfile attribute. This generator is used to create the id.
- */
- private static final AtomicLong txIdGenerator = new AtomicLong();
+ public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
- private static final Charset utf8 = Charset.forName("UTF-8");
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
- private final ProcessorLog logger;
- private final SSLContext sslContext;
- private final Pattern attributesToSend;
- private final ProcessContext context;
- private final ProcessSession session;
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .required(false)
+ .name(propertyDescriptorName)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
+ .dynamic(true)
+ .expressionLanguageSupported(true)
+ .build();
+ }
- private final long txId = txIdGenerator.incrementAndGet();
- private final long startNanos = System.nanoTime();
+ @Override
+ public Set getRelationships() {
+ return RELATIONSHIPS;
+ }
- private FlowFile request;
- private FlowFile response;
- private HttpURLConnection conn;
+ private volatile Pattern regexAttributesToSend = null;
- private int statusCode;
- private String statusMessage;
-
- public Transaction(
- final ProcessorLog logger,
- final SSLContext sslContext,
- final Pattern attributesToSend,
- final ProcessContext context,
- final ProcessSession session,
- final FlowFile request) {
-
- this.logger = logger;
- this.sslContext = sslContext;
- this.attributesToSend = attributesToSend;
- this.context = context;
- this.session = session;
- this.request = request;
- }
-
-
- public void process() {
- try {
- openConnection();
- sendRequest();
- readResponse();
- transfer();
- } catch (final Exception e) {
- // log exception
- logger.error("Routing to {} due to exception: {}", new Object[] {REL_FAILURE.getName(), e}, e);
-
- // penalize
- request = session.penalize(request);
-
- // transfer original to failure
- session.transfer(request, REL_FAILURE);
-
- // cleanup response flowfile, if applicable
- try {
- if (response != null) {
- session.remove(response);
- }
- } catch (final Exception e1) {
- logger.error("Could not cleanup response flowfile due to exception: {}", new Object[] {e1}, e1);
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+ if (descriptor.isDynamic()) {
+ final Set newDynamicPropertyNames = new HashSet<>(dynamicPropertyNames);
+ if (newValue == null) {
+ newDynamicPropertyNames.remove(descriptor.getName());
+ } else if (oldValue == null) { // new property
+ newDynamicPropertyNames.add(descriptor.getName());
+ }
+ this.dynamicPropertyNames = Collections.unmodifiableSet(newDynamicPropertyNames);
+ } else {
+ // compile the attributes-to-send filter pattern
+ if (PROP_ATTRIBUTES_TO_SEND.getName().equalsIgnoreCase(descriptor.getName())) {
+ if (newValue.isEmpty()) {
+ regexAttributesToSend = null;
+ } else {
+ final String trimmedValue = StringUtils.trimToEmpty(newValue);
+ regexAttributesToSend = Pattern.compile(trimmedValue);
}
}
}
+ }
- private void openConnection() throws IOException {
+ @Override
+ protected Collection customValidate(final ValidationContext validationContext) {
+ final List results = new ArrayList<>(1);
+ final boolean proxyHostSet = validationContext.getProperty(PROP_PROXY_HOST).isSet();
+ final boolean proxyPortSet = validationContext.getProperty(PROP_PROXY_PORT).isSet();
+
+ if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) {
+ results.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build());
+ }
+
+ return results;
+ }
+
+ @OnScheduled
+ public void setUpClient(final ProcessContext context) throws IOException {
+ okHttpClientAtomicReference.set(null);
+
+ OkHttpClient okHttpClient = new OkHttpClient();
+
+ // Add a proxy if set
+ final String proxyHost = context.getProperty(PROP_PROXY_HOST).getValue();
+ final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).asInteger();
+ if (proxyHost != null && proxyPort != null) {
+ final Proxy proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
+ okHttpClient.setProxy(proxy);
+ }
+
+ // Set timeouts
+ okHttpClient.setConnectTimeout((context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS);
+ okHttpClient.setReadTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS);
+
+ // Set whether to follow redirects
+ okHttpClient.setFollowRedirects(context.getProperty(PROP_FOLLOW_REDIRECTS).asBoolean());
+
+ final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(ClientAuth.NONE);
+
+ // check if the ssl context is set and add the factory if so
+ if (sslContext != null) {
+ okHttpClient.setSslSocketFactory(sslContext.getSocketFactory());
+ }
+
+ // check the trusted hostname property and override the HostnameVerifier
+ String trustedHostname = trimToEmpty(context.getProperty(PROP_TRUSTED_HOSTNAME).getValue());
+ if (!trustedHostname.isEmpty()) {
+ okHttpClient.setHostnameVerifier(new OverrideHostnameVerifier(trustedHostname, okHttpClient.getHostnameVerifier()));
+ }
+
+ final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
+
+ // If the username/password properties are set then check if digest auth is being used
+ if (!authUser.isEmpty() && "true".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) {
+ final String authPass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue());
+
+ /*
+ * Currently OkHttp doesn't have built-in Digest Auth Support. The ticket for adding it is here:
+ * https://github.com/square/okhttp/issues/205#issuecomment-154047052
+ * Once added this should be refactored to use the built in support. For now, a third party lib is needed.
+ */
+ final Map authCache = new ConcurrentHashMap<>();
+
+ com.burgstaller.okhttp.digest.Credentials credentials = new com.burgstaller.okhttp.digest.Credentials(authUser, authPass);
+
+ final DigestAuthenticator digestAuthenticator = new DigestAuthenticator(credentials);
+
+ DispatchingAuthenticator authenticator = new DispatchingAuthenticator.Builder()
+ .with("Digest", digestAuthenticator)
+ .build();
+
+ okHttpClient.interceptors().add(new AuthenticationCacheInterceptor(authCache));
+ okHttpClient.setAuthenticator(new CachingAuthenticatorDecorator(authenticator, authCache));
+ }
+
+ okHttpClientAtomicReference.set(okHttpClient);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ OkHttpClient okHttpClient = okHttpClientAtomicReference.get();
+
+ FlowFile requestFlowFile = session.get();
+
+ // Checking to see if the property to put the body of the response in an attribute was set
+ boolean putToAttribute = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).isSet();
+ if (requestFlowFile == null) {
+ if(context.hasNonLoopConnection()){
+ return;
+ }
+
+ String request = context.getProperty(PROP_METHOD).evaluateAttributeExpressions().getValue().toUpperCase();
+ if ("POST".equals(request) || "PUT".equals(request)) {
+ return;
+ } else if (putToAttribute) {
+ requestFlowFile = session.create();
+ }
+ }
+
+ // Setting some initial variables
+ final int maxAttributeSize = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
+ final ProcessorLog logger = getLogger();
+
+ // Every request/response cycle has a unique transaction id which will be stored as a flowfile attribute.
+ final UUID txId = UUID.randomUUID();
+
+ FlowFile responseFlowFile = null;
+ try {
// read the url property from the context
- final String urlstr = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(request).getValue());
+ final String urlstr = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(requestFlowFile).getValue());
final URL url = new URL(urlstr);
- final String authuser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
- final String authpass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue());
- String authstrencoded = null;
- if (!authuser.isEmpty()) {
- String authstr = authuser + ":" + authpass;
- byte[] bytestrencoded = Base64.encodeBase64(authstr.getBytes(StandardCharsets.UTF_8));
- authstrencoded = new String(bytestrencoded, StandardCharsets.UTF_8);
- }
-
- // create the connection
- final String proxyHost = context.getProperty(PROP_PROXY_HOST).getValue();
- final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).asInteger();
- if (proxyHost == null || proxyPort == null) {
- conn = (HttpURLConnection) url.openConnection();
- } else {
- final Proxy proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
- conn = (HttpURLConnection) url.openConnection(proxy);
- }
-
- if (authstrencoded != null) {
- conn.setRequestProperty("Authorization", "Basic " + authstrencoded);
- }
-
- // set the request method
- String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(request).getValue()).toUpperCase();
- conn.setRequestMethod(method);
-
- // set timeouts
- conn.setConnectTimeout(context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
- conn.setReadTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
-
- // set whether to follow redirects
- conn.setInstanceFollowRedirects(context.getProperty(PROP_FOLLOW_REDIRECTS).asBoolean());
-
- // special handling for https
- if (conn instanceof HttpsURLConnection) {
- HttpsURLConnection sconn = (HttpsURLConnection) conn;
-
- // check if the ssl context is set
- if (sslContext != null) {
- sconn.setSSLSocketFactory(sslContext.getSocketFactory());
- }
-
- // check the trusted hostname property and override the HostnameVerifier
- String trustedHostname = trimToEmpty(context.getProperty(PROP_TRUSTED_HOSTNAME).getValue());
- if (!trustedHostname.isEmpty()) {
- sconn.setHostnameVerifier(new OverrideHostnameVerifier(trustedHostname, sconn.getHostnameVerifier()));
- }
- }
-
- }
-
- private void sendRequest() throws IOException {
- // set the http request properties using flowfile attribute values
- setRequestProperties();
+ Request httpRequest = configureRequest(context, session, requestFlowFile, url);
// log request
- logRequest();
+ logRequest(logger, httpRequest);
- // we only stream data for POST and PUT requests
- String method = conn.getRequestMethod().toUpperCase();
- if ("POST".equals(method) || "PUT".equals(method)) {
- conn.setDoOutput(true);
- conn.setFixedLengthStreamingMode(request.getSize());
-
- // write the flowfile contents to the output stream
- try (OutputStream os = new BufferedOutputStream(conn.getOutputStream())) {
- session.exportTo(request, os);
- }
-
- // emit provenance event
- session.getProvenanceReporter().send(request, conn.getURL().toExternalForm());
+ // emit send provenance event if successfully sent to the server
+ if (httpRequest.body() != null) {
+ session.getProvenanceReporter().send(requestFlowFile, url.toExternalForm(), true);
}
- }
-
- private void readResponse() throws IOException {
+ final long startNanos = System.nanoTime();
+ Response responseHttp = okHttpClient.newCall(httpRequest).execute();
// output the raw response headers (DEBUG level only)
- logResponse();
+ logResponse(logger, url, responseHttp);
// store the status code and message
- statusCode = conn.getResponseCode();
- statusMessage = conn.getResponseMessage();
+ int statusCode = responseHttp.code();
+ String statusMessage = responseHttp.message();
- // always write the status attributes to the request flowfile
- request = writeStatusAttributes(request);
-
- // read from the appropriate input stream
- try (InputStream is = getResponseStream()) {
-
- // if not successful, store the response body into a flowfile attribute
- if (!isSuccess()) {
- String body = trimToEmpty(toString(is, utf8));
- request = session.putAttribute(request, RESPONSE_BODY, body);
- }
-
- // if successful, store the response body as the flowfile payload
- // we include additional flowfile attributes including the reponse headers
- // and the status codes.
- if (isSuccess()) {
- // clone the flowfile to capture the response
- response = session.create(request);
-
- // write the status attributes
- response = writeStatusAttributes(response);
-
- // write the response headers as attributes
- // this will overwrite any existing flowfile attributes
- response = session.putAllAttributes(response, convertAttributesFromHeaders());
-
- // transfer the message body to the payload
- // can potentially be null in edge cases
- if (is != null) {
- response = session.importFrom(is, response);
-
- // emit provenance event
- final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().fetch(response, conn.getURL().toExternalForm(), millis);
- }
-
- }
-
- }
-
- }
-
- private void transfer() throws IOException {
- // check if we should penalize the request
- if (!isSuccess()) {
- request = session.penalize(request);
- }
-
- // log the status codes from the response
- logger.info("Request to {} returned status code {} for {}",
- new Object[] {conn.getURL().toExternalForm(), statusCode, request});
-
- // transfer to the correct relationship
- // 2xx -> SUCCESS
- if (isSuccess()) {
- // we have two flowfiles to transfer
- session.transfer(request, REL_SUCCESS_REQ);
- session.transfer(response, REL_SUCCESS_RESP);
-
- // 5xx -> RETRY
- } else if (statusCode / 100 == 5) {
- session.transfer(request, REL_RETRY);
-
- // 1xx, 3xx, 4xx -> NO RETRY
- } else {
- session.transfer(request, REL_NO_RETRY);
- }
-
- }
-
- private void setRequestProperties() {
-
- // check if we should send the a Date header with the request
- if (context.getProperty(PROP_DATE_HEADER).asBoolean()) {
- conn.setRequestProperty("Date", getDateValue());
- }
-
- // iterate through the flowfile attributes, adding any attribute that
- // matches the attributes-to-send pattern. if the pattern is not set
- // (it's an optional property), ignore that attribute entirely
- if (attributesToSend != null) {
- Map attributes = request.getAttributes();
- Matcher m = attributesToSend.matcher("");
- for (Map.Entry entry : attributes.entrySet()) {
- String key = trimToEmpty(entry.getKey());
- String val = trimToEmpty(entry.getValue());
-
- // don't include any of the ignored attributes
- if (IGNORED_ATTRIBUTES.contains(key)) {
- continue;
- }
-
- // check if our attribute key matches the pattern
- // if so, include in the request as a header
- m.reset(key);
- if (m.matches()) {
- conn.setRequestProperty(key, val);
- }
- }
- }
- }
-
- /**
- * Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings.
- */
- private Map convertAttributesFromHeaders() throws IOException {
- // create a new hashmap to store the values from the connection
- Map map = new HashMap<>();
- for (Map.Entry> entry : conn.getHeaderFields().entrySet()) {
- String key = entry.getKey();
- if (key == null) {
- continue;
- }
-
- List values = entry.getValue();
-
- // we ignore any headers with no actual values (rare)
- if (values == null || values.isEmpty()) {
- continue;
- }
-
- // create a comma separated string from the values, this is stored in the map
- String value = csv(values);
-
- // put the csv into the map
- map.put(key, value);
- }
-
- if (conn instanceof HttpsURLConnection) {
- HttpsURLConnection sconn = (HttpsURLConnection) conn;
- // this should seemingly not be required, but somehow the state of the jdk client is messed up
- // when retrieving SSL certificate related information if connect() has not been called previously.
- sconn.connect();
- map.put(REMOTE_DN, sconn.getPeerPrincipal().getName());
- }
-
- return map;
- }
-
- private boolean isSuccess() throws IOException {
if (statusCode == 0) {
throw new IllegalStateException("Status code unknown, connection hasn't been attempted.");
}
- return statusCode / 100 == 2;
- }
- private void logRequest() {
- logger.debug("\nRequest to remote service:\n\t{}\n{}",
- new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getRequestProperties())});
- }
+ // Create a map of the status attributes that are always written to the request and reponse FlowFiles
+ Map statusAttributes = new HashMap<>();
+ statusAttributes.put(STATUS_CODE, String.valueOf(statusCode));
+ statusAttributes.put(STATUS_MESSAGE, statusMessage);
+ statusAttributes.put(REQUEST_URL, url.toExternalForm());
+ statusAttributes.put(TRANSACTION_ID, txId.toString());
- private void logResponse() {
- logger.debug("\nResponse from remote service:\n\t{}\n{}",
- new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getHeaderFields())});
- }
-
- private String getLogString(Map> map) {
- StringBuilder sb = new StringBuilder();
- for (Map.Entry> entry : map.entrySet()) {
- List list = entry.getValue();
- if (list.isEmpty()) {
- continue;
- }
- sb.append("\t");
- sb.append(entry.getKey());
- sb.append(": ");
- if (list.size() == 1) {
- sb.append(list.get(0));
- } else {
- sb.append(list.toString());
- }
- sb.append("\n");
- }
- return sb.toString();
- }
-
- /**
- * Convert a collection of string values into a overly simple comma separated string.
- *
- * Does not handle the case where the value contains the delimiter. i.e. if a value contains a comma, this method does nothing to try and escape or quote the value, in traditional csv style.
- */
- private String csv(Collection values) {
- if (values == null || values.isEmpty()) {
- return "";
- }
- if (values.size() == 1) {
- return values.iterator().next();
+ if (requestFlowFile != null) {
+ requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes);
}
- StringBuilder sb = new StringBuilder();
- for (String value : values) {
- value = value.trim();
- if (value.isEmpty()) {
- continue;
- }
- if (sb.length() > 0) {
- sb.append(", ");
- }
- sb.append(value);
- }
- return sb.toString().trim();
- }
-
- /**
- * Return the current datetime as an RFC 1123 formatted string in the GMT tz.
- */
- private String getDateValue() {
- return dateFormat.print(System.currentTimeMillis());
- }
-
- /**
- * Returns a string from the input stream using the specified character encoding.
- */
- private String toString(InputStream is, Charset charset) throws IOException {
- if (is == null) {
- return "";
+ // If the property to add the response headers to the request flowfile is true then add them
+ if (context.getProperty(PROP_ADD_HEADERS_TO_REQUEST).asBoolean() && requestFlowFile != null) {
+ // write the response headers as attributes
+ // this will overwrite any existing flowfile attributes
+ requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(url, responseHttp));
}
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- byte[] buf = new byte[4096];
- int len;
- while ((len = is.read(buf)) != -1) {
- out.write(buf, 0, len);
- }
- return new String(out.toByteArray(), charset);
- }
+ boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null;
+ boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute) || context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean();
+ ResponseBody responseBody = responseHttp.body();
+ boolean bodyExists = responseBody != null;
- /**
- * Returns the input stream to use for reading from the remote server. We're either going to want the inputstream or errorstream, effectively depending on the status code.
- *
- * This method can return null if there is no inputstream to read from. For example, if the remote server did not send a message body. eg. 204 No Content or 304 Not Modified
- */
- private InputStream getResponseStream() {
+ InputStream responseBodyStream = null;
+ SoftLimitBoundedByteArrayOutputStream outputStreamToRequestAttribute = null;
+ TeeInputStream teeInputStream = null;
try {
- InputStream is = conn.getErrorStream();
- if (is == null) {
- is = conn.getInputStream();
+ responseBodyStream = bodyExists ? responseBody.byteStream() : null;
+ if (responseBodyStream != null && outputBodyToRequestAttribute && outputBodyToResponseContent) {
+ outputStreamToRequestAttribute = new SoftLimitBoundedByteArrayOutputStream(maxAttributeSize);
+ teeInputStream = new TeeInputStream(responseBodyStream, outputStreamToRequestAttribute);
}
- return new BufferedInputStream(is);
- } catch (IOException e) {
- logger.warn("Response stream threw an exception: {}", new Object[] {e}, e);
- return null;
- }
- }
+ if (outputBodyToResponseContent) {
+ /*
+ * If successful and putting to response flowfile, store the response body as the flowfile payload
+ * we include additional flowfile attributes including the response headers and the status codes.
+ */
- /**
- * Writes the status attributes onto the flowfile, returning the flowfile that was updated.
- */
- private FlowFile writeStatusAttributes(FlowFile flowfile) {
- flowfile = session.putAttribute(flowfile, STATUS_CODE, String.valueOf(statusCode));
- flowfile = session.putAttribute(flowfile, STATUS_MESSAGE, statusMessage);
- flowfile = session.putAttribute(flowfile, REQUEST_URL, conn.getURL().toExternalForm());
- flowfile = session.putAttribute(flowfile, TRANSACTION_ID, Long.toString(txId));
- return flowfile;
- }
+ // clone the flowfile to capture the response
+ if (requestFlowFile != null) {
+ responseFlowFile = session.create(requestFlowFile);
+ } else {
+ responseFlowFile = session.create();
+ }
- /**
- *
- */
- private static class OverrideHostnameVerifier implements HostnameVerifier {
+ // write the status attributes
+ responseFlowFile = session.putAllAttributes(responseFlowFile, statusAttributes);
- private final String trustedHostname;
- private final HostnameVerifier delegate;
+ // write the response headers as attributes
+ // this will overwrite any existing flowfile attributes
+ responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(url, responseHttp));
- private OverrideHostnameVerifier(String trustedHostname, HostnameVerifier delegate) {
- this.trustedHostname = trustedHostname;
- this.delegate = delegate;
+ // transfer the message body to the payload
+ // can potentially be null in edge cases
+ if (bodyExists) {
+ if (teeInputStream != null) {
+ responseFlowFile = session.importFrom(teeInputStream, responseFlowFile);
+ } else {
+ responseFlowFile = session.importFrom(responseBodyStream, responseFlowFile);
+ }
+
+ // emit provenance event
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().fetch(responseFlowFile, url.toExternalForm(), millis);
+ }
+ }
+
+ // if not successful and request flowfile is not null, store the response body into a flowfile attribute
+ if (outputBodyToRequestAttribute && bodyExists) {
+ String attributeKey = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).evaluateAttributeExpressions(requestFlowFile).getValue();
+ if (attributeKey == null) {
+ attributeKey = RESPONSE_BODY;
+ }
+ byte[] outputBuffer;
+ int size;
+
+ if (outputStreamToRequestAttribute != null) {
+ outputBuffer = outputStreamToRequestAttribute.getBuffer();
+ size = outputStreamToRequestAttribute.size();
+ } else {
+ outputBuffer = new byte[maxAttributeSize];
+ size = StreamUtils.fillBuffer(responseBodyStream, outputBuffer, false);
+ }
+ String bodyString = new String(outputBuffer, 0, size, getCharsetFromMediaType(responseBody.contentType()));
+ requestFlowFile = session.putAttribute(requestFlowFile, attributeKey, bodyString);
+
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().modifyAttributes(requestFlowFile, "The " + attributeKey + " has been added. The value of which is the body of a http call to "
+ + url.toExternalForm() + ". It took " + millis + "millis,");
+ }
+ } finally {
+ if(outputStreamToRequestAttribute != null){
+ outputStreamToRequestAttribute.close();
+ outputStreamToRequestAttribute = null;
+ }
+ if(teeInputStream != null){
+ teeInputStream.close();
+ teeInputStream = null;
+ } else if(responseBodyStream != null){
+ responseBodyStream.close();
+ responseBodyStream = null;
+ }
}
- @Override
- public boolean verify(String hostname, SSLSession session) {
- if (trustedHostname.equalsIgnoreCase(hostname)) {
- return true;
+ route(requestFlowFile, responseFlowFile, session, context, statusCode);
+ } catch (final Exception e) {
+ // penalize or yield
+ if (requestFlowFile != null) {
+ logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, e);
+ requestFlowFile = session.penalize(requestFlowFile);
+ // transfer original to failure
+ session.transfer(requestFlowFile, REL_FAILURE);
+ } else {
+ logger.error("Yielding processor due to exception encountered as a source processor: {}", e);
+ context.yield();
+ }
+
+
+ // cleanup response flowfile, if applicable
+ try {
+ if (responseFlowFile != null) {
+ session.remove(responseFlowFile);
}
- return delegate.verify(hostname, session);
+ } catch (final Exception e1) {
+ logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{e1}, e1);
}
}
}
+
+ private Request configureRequest(final ProcessContext context, final ProcessSession session, final FlowFile requestFlowFile, URL url) {
+ Request.Builder requestBuilder = new Request.Builder();
+
+ requestBuilder = requestBuilder.url(url);
+ final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
+
+ // If the username/password properties are set then check if digest auth is being used
+ if (!authUser.isEmpty() && "false".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) {
+ final String authPass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue());
+
+ String credential = com.squareup.okhttp.Credentials.basic(authUser, authPass);
+ requestBuilder = requestBuilder.header("Authorization", credential);
+ }
+
+ // set the request method
+ String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile).getValue()).toUpperCase();
+ switch (method) {
+ case "GET":
+ requestBuilder = requestBuilder.get();
+ break;
+ case "POST":
+ RequestBody requestBody = getRequestBodyToSend(session, requestFlowFile);
+ requestBuilder = requestBuilder.post(requestBody);
+ break;
+ case "PUT":
+ requestBody = getRequestBodyToSend(session, requestFlowFile);
+ requestBuilder = requestBuilder.put(requestBody);
+ break;
+ case "HEAD":
+ requestBuilder = requestBuilder.head();
+ break;
+ case "DELETE":
+ requestBuilder = requestBuilder.delete();
+ break;
+ default:
+ requestBuilder = requestBuilder.method(method, null);
+ }
+
+ requestBuilder = setHeaderProperties(context, requestBuilder, requestFlowFile);
+
+ return requestBuilder.build();
+ }
+
+ private RequestBody getRequestBodyToSend(final ProcessSession session, final FlowFile requestFlowFile) {
+ return new RequestBody() {
+ @Override
+ public MediaType contentType() {
+ final String attributeValue = requestFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
+ String contentType = attributeValue == null ? DEFAULT_CONTENT_TYPE : attributeValue;
+ return MediaType.parse(contentType);
+ }
+
+ @Override
+ public void writeTo(BufferedSink sink) throws IOException {
+ session.exportTo(requestFlowFile, sink.outputStream());
+ }
+ };
+ }
+
+ private Request.Builder setHeaderProperties(final ProcessContext context, Request.Builder requestBuilder, final FlowFile requestFlowFile) {
+ // check if we should send the a Date header with the request
+ if (context.getProperty(PROP_DATE_HEADER).asBoolean()) {
+ requestBuilder = requestBuilder.addHeader("Date", DATE_FORMAT.print(System.currentTimeMillis()));
+ }
+
+ for (String headerKey : dynamicPropertyNames) {
+ String headerValue = context.getProperty(headerKey).evaluateAttributeExpressions(requestFlowFile).getValue();
+ requestBuilder = requestBuilder.addHeader(headerKey, headerValue);
+ }
+
+ // iterate through the flowfile attributes, adding any attribute that
+ // matches the attributes-to-send pattern. if the pattern is not set
+ // (it's an optional property), ignore that attribute entirely
+ if (regexAttributesToSend != null) {
+ Map attributes = requestFlowFile.getAttributes();
+ Matcher m = regexAttributesToSend.matcher("");
+ for (Map.Entry entry : attributes.entrySet()) {
+ String headerKey = trimToEmpty(entry.getKey());
+
+ // don't include any of the ignored attributes
+ if (IGNORED_ATTRIBUTES.contains(headerKey)) {
+ continue;
+ }
+
+ // check if our attribute key matches the pattern
+ // if so, include in the request as a header
+ m.reset(headerKey);
+ if (m.matches()) {
+ String headerVal = trimToEmpty(entry.getValue());
+ requestBuilder = requestBuilder.addHeader(headerKey, headerVal);
+ }
+ }
+ }
+ return requestBuilder;
+ }
+
+
+ private void route(FlowFile request, FlowFile response, ProcessSession session, ProcessContext context, int statusCode){
+ // check if we should penalize the request
+ if (!isSuccess(statusCode)) {
+ if (request == null) {
+ context.yield();
+ } else {
+ request = session.penalize(request);
+ }
+ }
+
+ // If the property to output the response flowfile regardless of status code is set then transfer it
+ boolean responseSent = false;
+ if (context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean()) {
+ session.transfer(response, REL_RESPONSE);
+ responseSent = true;
+ }
+
+ // transfer to the correct relationship
+ // 2xx -> SUCCESS
+ if (isSuccess(statusCode)) {
+ // we have two flowfiles to transfer
+ if (request != null) {
+ session.transfer(request, REL_SUCCESS_REQ);
+ }
+ if (response != null && !responseSent) {
+ session.transfer(response, REL_RESPONSE);
+ }
+
+ // 5xx -> RETRY
+ } else if (statusCode / 100 == 5) {
+ if (request != null) {
+ session.transfer(request, REL_RETRY);
+ }
+
+ // 1xx, 3xx, 4xx -> NO RETRY
+ } else {
+ if (request != null) {
+ session.transfer(request, REL_NO_RETRY);
+ }
+ }
+
+ }
+
+ private boolean isSuccess(int statusCode) {
+ return statusCode / 100 == 2;
+ }
+
+ private void logRequest(ProcessorLog logger, Request request) {
+ logger.debug("\nRequest to remote service:\n\t{}\n{}",
+ new Object[]{request.url().toExternalForm(), getLogString(request.headers().toMultimap())});
+ }
+
+ private void logResponse(ProcessorLog logger, URL url, Response response) {
+ logger.debug("\nResponse from remote service:\n\t{}\n{}",
+ new Object[]{url.toExternalForm(), getLogString(response.headers().toMultimap())});
+ }
+
+ private String getLogString(Map> map) {
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry> entry : map.entrySet()) {
+ List list = entry.getValue();
+ if (list.isEmpty()) {
+ continue;
+ }
+ sb.append("\t");
+ sb.append(entry.getKey());
+ sb.append(": ");
+ if (list.size() == 1) {
+ sb.append(list.get(0));
+ } else {
+ sb.append(list.toString());
+ }
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Convert a collection of string values into a overly simple comma separated string.
+ *
+ * Does not handle the case where the value contains the delimiter. i.e. if a value contains a comma, this method does nothing to try and escape or quote the value, in traditional csv style.
+ */
+ private String csv(Collection values) {
+ if (values == null || values.isEmpty()) {
+ return "";
+ }
+ if (values.size() == 1) {
+ return values.iterator().next();
+ }
+
+ StringBuilder sb = new StringBuilder();
+ for (String value : values) {
+ value = value.trim();
+ if (value.isEmpty()) {
+ continue;
+ }
+ if (sb.length() > 0) {
+ sb.append(", ");
+ }
+ sb.append(value);
+ }
+ return sb.toString().trim();
+ }
+
+ /**
+ * Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings.
+ */
+ private Map convertAttributesFromHeaders(URL url, Response responseHttp){
+ // create a new hashmap to store the values from the connection
+ Map map = new HashMap<>();
+ for (Map.Entry> entry : responseHttp.headers().toMultimap().entrySet()) {
+ String key = entry.getKey();
+ if (key == null) {
+ continue;
+ }
+
+ List values = entry.getValue();
+
+ // we ignore any headers with no actual values (rare)
+ if (values == null || values.isEmpty()) {
+ continue;
+ }
+
+ // create a comma separated string from the values, this is stored in the map
+ String value = csv(values);
+
+ // put the csv into the map
+ map.put(key, value);
+ }
+
+ if ("HTTPS".equals(url.getProtocol().toUpperCase())) {
+ map.put(REMOTE_DN, responseHttp.handshake().peerPrincipal().getName());
+ }
+
+ return map;
+ }
+
+ private Charset getCharsetFromMediaType(MediaType contentType) {
+ return contentType != null ? contentType.charset(StandardCharsets.UTF_8) : StandardCharsets.UTF_8;
+ }
+
+ private static class OverrideHostnameVerifier implements HostnameVerifier {
+
+ private final String trustedHostname;
+ private final HostnameVerifier delegate;
+
+ private OverrideHostnameVerifier(String trustedHostname, HostnameVerifier delegate) {
+ this.trustedHostname = trustedHostname;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public boolean verify(String hostname, SSLSession session) {
+ if (trustedHostname.equalsIgnoreCase(hostname)) {
+ return true;
+ }
+ return delegate.verify(hostname, session);
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
index a26b2eddac..a82bc5a1f6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
@@ -19,22 +19,27 @@ package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.processors.standard.util.TestInvokeHttpCommon;
+import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+
public class TestInvokeHTTP extends TestInvokeHttpCommon {
@BeforeClass
@@ -72,41 +77,99 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon {
return new TestServer();
}
+ @Test
+ public void testSslSetHttpRequest() throws Exception {
+
+ final Map sslProperties = new HashMap<>();
+ sslProperties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks");
+ sslProperties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest");
+ sslProperties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
+ sslProperties.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks");
+ sslProperties.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest");
+ sslProperties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
+
+ runner = TestRunners.newTestRunner(InvokeHTTP.class);
+ final StandardSSLContextService sslService = new StandardSSLContextService();
+ runner.addControllerService("ssl-context", sslService, sslProperties);
+ runner.enableControllerService(sslService);
+ runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
+
+ addHandler(new GetOrHeadHandler());
+
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+
+ createFlowFiles(runner);
+
+ runner.run();
+
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+
+ // expected in request status.code and status.message
+ // original flow file (+attributes)
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
+ bundle.assertContentEquals("Hello".getBytes("UTF-8"));
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+ bundle.assertAttributeEquals("Foo", "Bar");
+
+ // expected in response
+ // status code, status message, all headers from server response --> ff attributes
+ // server response message body into payload of ff
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
+ bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals("Foo", "Bar");
+ bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
+ bundle1.assertAttributeEquals("OkHttp-Selected-Protocol", "http/1.1");
+ }
+
// Currently InvokeHttp does not support Proxy via Https
@Test
public void testProxy() throws Exception {
addHandler(new MyProxyHandler());
URL proxyURL = new URL(url);
- runner.setProperty(InvokeHTTP.Config.PROP_URL, "http://nifi.apache.org/"); // just a dummy URL no connection goes out
- runner.setProperty(InvokeHTTP.Config.PROP_PROXY_HOST, proxyURL.getHost());
- runner.setProperty(InvokeHTTP.Config.PROP_PROXY_PORT, String.valueOf(proxyURL.getPort()));
+ runner.setProperty(InvokeHTTP.PROP_URL, "http://nifi.apache.org/"); // just a dummy URL no connection goes out
+ runner.setProperty(InvokeHTTP.PROP_PROXY_HOST, proxyURL.getHost());
+
+ try{
+ runner.run();
+ Assert.fail();
+ } catch (AssertionError e){
+ // Expect assetion error when proxy port isn't set but host is.
+ }
+ runner.setProperty(InvokeHTTP.PROP_PROXY_PORT, String.valueOf(proxyURL.getPort()));
createFlowFiles(runner);
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
//expected in request status.code and status.message
//original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
//expected in response
//status code, status message, all headers from server response --> ff attributes
//server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0);
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
bundle1.assertContentEquals("http://nifi.apache.org/".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java
index d155b74d48..b3cd9dc7a2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHttpSSL.java
@@ -17,7 +17,6 @@
package org.apache.nifi.processors.standard;
-import org.apache.nifi.processors.standard.InvokeHTTP.Config;
import org.apache.nifi.processors.standard.util.TestInvokeHttpCommon;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.TestRunners;
@@ -63,7 +62,7 @@ public class TestInvokeHttpSSL extends TestInvokeHttpCommon {
final StandardSSLContextService sslService = new StandardSSLContextService();
runner.addControllerService("ssl-context", sslService, sslProperties);
runner.enableControllerService(sslService);
- runner.setProperty(Config.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
+ runner.setProperty(InvokeHTTP.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
server.clearHandlers();
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
index 88dfcdb056..b44f01522c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java
@@ -24,9 +24,16 @@ import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
+import org.eclipse.jetty.security.ConstraintSecurityHandler;
+import org.eclipse.jetty.security.DefaultIdentityService;
+import org.eclipse.jetty.security.HashLoginService;
+import org.eclipse.jetty.security.ServerAuthException;
+import org.eclipse.jetty.security.authentication.DigestAuthenticator;
+import org.eclipse.jetty.server.Authentication;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.util.security.Password;
import org.junit.Assert;
import org.junit.Test;
@@ -67,7 +74,7 @@ public abstract class TestInvokeHttpCommon {
final DateHandler dh = new DateHandler();
addHandler(dh);
- runner.setProperty(InvokeHTTP.Config.PROP_URL, url);
+ runner.setProperty(InvokeHTTP.PROP_URL, url);
createFlowFiles(runner);
runner.run();
@@ -94,35 +101,356 @@ public abstract class TestInvokeHttpCommon {
public void test200() throws Exception {
addHandler(new GetOrHeadHandler());
- runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
createFlowFiles(runner);
+ // Verify only one FlowFile gets created/sent
+ runner.run();
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
// expected in request status.code and status.message
// original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
// expected in response
// status code, status message, all headers from server response --> ff attributes
// server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0);
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
+
+ final List provEvents = runner.getProvenanceEvents();
+ assertEquals(2, provEvents.size());
+ boolean forkEvent = false;
+ boolean fetchEvent = false;
+ for (final ProvenanceEventRecord event : provEvents) {
+ if (event.getEventType() == ProvenanceEventType.FORK) {
+ forkEvent = true;
+ } else if (event.getEventType() == ProvenanceEventType.FETCH) {
+ fetchEvent = true;
+ }
+ }
+
+ assertTrue(forkEvent);
+ assertTrue(fetchEvent);
+ }
+
+ @Test
+ public void testOutputResponseRegardless() throws Exception {
+ addHandler(new GetOrHeadHandler());
+
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
+ runner.setProperty(InvokeHTTP.PROP_OUTPUT_RESPONSE_REGARDLESS,"true");
+
+ createFlowFiles(runner);
+
+ runner.run();
+
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+
+ // expected in request status.code and status.message
+ // original flow file (+attributes)
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
+ bundle.assertContentEquals("Hello".getBytes("UTF-8"));
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found");
+ bundle.assertAttributeEquals("Foo", "Bar");
+
+ // expected in response
+ // status code, status message, all headers from server response --> ff attributes
+ // server response message body into payload of ff
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
+ bundle1.assertContentEquals("NO".getBytes("UTF-8"));
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found");
+ bundle1.assertAttributeEquals("Foo", "Bar");
+ bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
+ }
+
+ @Test
+ public void testOutputResponseRegardlessWithOutputInAttribute() throws Exception {
+ addHandler(new GetOrHeadHandler());
+
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
+ runner.setProperty(InvokeHTTP.PROP_OUTPUT_RESPONSE_REGARDLESS,"true");
+ runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE,"outputBody");
+
+ createFlowFiles(runner);
+
+ runner.run();
+
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+
+ // expected in request status.code and status.message
+ // original flow file (+attributes)
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
+ bundle.assertContentEquals("Hello".getBytes("UTF-8"));
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found");
+ bundle.assertAttributeEquals("outputBody", "NO");
+ bundle.assertAttributeEquals("Foo", "Bar");
+
+ // expected in response
+ // status code, status message, all headers from server response --> ff attributes
+ // server response message body into payload of ff
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
+ bundle1.assertContentEquals("NO".getBytes("UTF-8"));
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found");
+ bundle1.assertAttributeEquals("Foo", "Bar");
+ bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
+ }
+
+ @Test
+ public void testOutputResponseRegardlessWithOutputInAttributeLarge() throws Exception {
+ addHandler(new GetLargeHandler());
+
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
+ runner.setProperty(InvokeHTTP.PROP_OUTPUT_RESPONSE_REGARDLESS,"true");
+ runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE,"outputBody");
+ runner.setProperty(InvokeHTTP.PROP_PUT_ATTRIBUTE_MAX_LENGTH,"11");
+
+ createFlowFiles(runner);
+
+ runner.run();
+
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+
+ // expected in request status.code and status.message
+ // original flow file (+attributes)
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
+ bundle.assertContentEquals("Hello".getBytes("UTF-8"));
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found");
+ bundle.assertAttributeEquals("outputBody", "Lorem ipsum");
+ bundle.assertAttributeEquals("Foo", "Bar");
+
+ // expected in response
+ // status code, status message, all headers from server response --> ff attributes
+ // server response message body into payload of ff
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
+ bundle1.assertContentEquals("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. "
+ + "Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor "
+ + "in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, "
+ + "sunt in culpa qui officia deserunt mollit anim id est laborum.");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "404");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Found");
+ bundle1.assertAttributeEquals("Foo", "Bar");
+ bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
+ }
+
+
+ @Test
+ public void testMultipleSameHeaders() throws Exception {
+ addHandler(new GetMultipleHeaderHandler());
+
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+
+ createFlowFiles(runner);
+
+ runner.run();
+
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+
+ // expected in request status.code and status.message
+ // original flow file (+attributes)
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
+ bundle.assertContentEquals("Hello".getBytes("UTF-8"));
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+ bundle.assertAttributeEquals("Foo", "Bar");
+
+ // expected in response
+ // status code, status message, all headers from server response --> ff attributes
+ // server response message body into payload of ff
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
+ bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals("Foo", "Bar");
+ bundle1.assertAttributeEquals("double", "1, 2");
+ bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
+ }
+
+ @Test
+ public void testPutResponseHeadersInRequest() throws Exception {
+ addHandler(new GetMultipleHeaderHandler());
+
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_ADD_HEADERS_TO_REQUEST, "true");
+
+ createFlowFiles(runner);
+
+ runner.run();
+
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+
+ // expected in request status.code and status.message
+ // original flow file (+all attributes from response)
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
+ bundle.assertContentEquals("Hello".getBytes("UTF-8"));
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+ bundle.assertAttributeEquals("Foo", "Bar");
+ bundle.assertAttributeEquals("double", "1, 2");
+ bundle.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
+
+ // expected in response
+ // status code, status message, all headers from server response --> ff attributes
+ // server response message body into payload of ff
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
+ bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals("Foo", "Bar");
+ bundle1.assertAttributeEquals("double", "1, 2");
+ bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
+ }
+
+ @Test
+ public void testToRequestAttribute() throws Exception {
+ addHandler(new GetOrHeadHandler());
+
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE,"outputBody");
+
+ createFlowFiles(runner);
+
+ runner.run();
+
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+
+ // expected in request status.code, status.message and body of response in attribute
+ // original flow file (+attributes)
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
+ bundle.assertContentEquals("Hello".getBytes("UTF-8"));
+ bundle.assertAttributeEquals("outputBody", "/status/200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+ bundle.assertAttributeEquals("Foo", "Bar");
+ }
+
+ @Test
+ public void testNoInput() throws Exception {
+ addHandler(new GetOrHeadHandler());
+
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_METHOD,"GET");
+ runner.setIncomingConnection(false);
+ runner.setNonLoopConnection(false);
+
+ runner.run();
+
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+
+ // expected in response
+ // status code, status message, all headers from server response --> ff attributes
+ // server response message body into payload of ff
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
+ bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
+ }
+
+ @Test
+ public void testNoInputFail() throws Exception {
+ addHandler(new GetOrHeadHandler());
+
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
+ runner.setIncomingConnection(false);
+ runner.setNonLoopConnection(false);
+
+ runner.run();
+
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+
+ runner.setProperty(InvokeHTTP.PROP_METHOD,"OPTION");
+
+ runner.run();
+
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+ }
+
+ @Test
+ public void testNoInputSendToAttribute() throws Exception {
+ addHandler(new GetOrHeadHandler());
+
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_PUT_OUTPUT_IN_ATTRIBUTE, "outputBody");
+ runner.setIncomingConnection(false);
+ runner.setNonLoopConnection(false);
+
+ runner.run();
+
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+
+ // expected in request
+ // status code, status message, no ff content
+ // server response message body into attribute of ff
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
+ bundle1.assertContentEquals("".getBytes("UTF-8"));
+ bundle1.assertAttributeEquals("outputBody", "/status/200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
}
@Test
@@ -132,9 +460,9 @@ public abstract class TestInvokeHttpCommon {
final String username = "basic_user";
final String password = "basic_password";
- runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.Config.PROP_BASIC_AUTH_USERNAME, username);
- runner.setProperty(InvokeHTTP.Config.PROP_BASIC_AUTH_PASSWORD, password);
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_USERNAME, username);
+ runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_PASSWORD, password);
final byte[] creds = String.format("%s:%s", username, password).getBytes(StandardCharsets.UTF_8);
final String expAuth = String.format("Basic %s", new String(encodeBase64(creds)));
@@ -142,28 +470,28 @@ public abstract class TestInvokeHttpCommon {
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
// expected in request status.code and status.message
// original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
// expected in response
// status code, status message, all headers from server response --> ff attributes
// server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0);
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
final String bundle1Content = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
assertTrue(bundle1Content.startsWith(expAuth)); // use startsWith instead of equals so we can ignore line endings
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
@@ -190,55 +518,140 @@ public abstract class TestInvokeHttpCommon {
final String username = "basic_user";
final String password = "basic_password";
- runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/401");
- runner.setProperty(InvokeHTTP.Config.PROP_BASIC_AUTH_USERNAME, username);
- runner.setProperty(InvokeHTTP.Config.PROP_BASIC_AUTH_PASSWORD, password);
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/401");
+ runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_USERNAME, username);
+ runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_PASSWORD, password);
createFlowFiles(runner);
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
// expected in request status.code and status.message
// original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_NO_RETRY).get(0);
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "401");
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "Unauthorized");
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "401");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Unauthorized");
bundle.assertAttributeEquals("Foo", "Bar");
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
final String expected = "Hello";
Assert.assertEquals(expected, actual);
- final String response = bundle.getAttribute(InvokeHTTP.Config.RESPONSE_BODY);
- assertEquals(response, "Get off my lawn!");
+ final String response = bundle.getAttribute(InvokeHTTP.RESPONSE_BODY);
+ assertEquals(response, "Get off my lawn!"+System.lineSeparator());
+ }
+
+ @Test
+ public void test200DigestAuth() throws Exception {
+ addHandler(new DigestAuthHandler());
+ final String username = "basic_user";
+ final String password = "basic_password";
+
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_USERNAME, username);
+ runner.setProperty(InvokeHTTP.PROP_BASIC_AUTH_PASSWORD, password);
+ runner.setProperty(InvokeHTTP.PROP_DIGEST_AUTH,"true");
+
+ createFlowFiles(runner);
+
+ runner.run();
+
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+
+ //expected in request status.code and status.message
+ //original flow file (+attributes)
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+ bundle.assertAttributeEquals("Foo", "Bar");
+ final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
+ final String expected = "Hello";
+ Assert.assertEquals(expected, actual);
+
+ //expected in response
+ //status code, status message, all headers from server response --> ff attributes
+ //server response message body into payload of ff
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
+ bundle1.assertContentEquals(("DIGEST"+System.lineSeparator()).getBytes("UTF-8"));
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals("Foo", "Bar");
+ bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
+ }
+
+ @Test
+ public void test401DigestNotAuth() throws Exception {
+ addHandler(new DigestAuthHandler());
+
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_DIGEST_AUTH,"false");
+ runner.setProperty(InvokeHTTP.PROP_PUT_ATTRIBUTE_MAX_LENGTH,"512");
+
+ createFlowFiles(runner);
+
+ runner.run();
+
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+
+ //expected in request status.code and status.message
+ //original flow file (+attributes)
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "401");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Unauthorized");
+ bundle.assertAttributeEquals("Foo", "Bar");
+ final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
+ final String expected = "Hello";
+ Assert.assertEquals(expected, actual);
+
+ final String response = bundle.getAttribute(InvokeHTTP.RESPONSE_BODY);
+ assertEquals("\n" +
+ "\n" +
+ "\n" +
+ "Error 401 \n" +
+ "\n" +
+ "\n" +
+ "HTTP ERROR: 401
\n" +
+ "Problem accessing /status/200. Reason:\n" +
+ "
Unauthorized
\n" +
+ "
Powered by Jetty://\n" +
+ "\n" +
+ "\n", response);
}
@Test
public void test500() throws Exception {
addHandler(new GetOrHeadHandler());
- runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/500");
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/500");
createFlowFiles(runner);
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
// expected in response
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_RETRY).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RETRY).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "500");
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "Server Error");
- bundle.assertAttributeEquals(InvokeHTTP.Config.RESPONSE_BODY, "/status/500");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "500");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Server Error");
+ bundle.assertAttributeEquals(InvokeHTTP.RESPONSE_BODY, "/status/500");
final String expected = "Hello";
Assert.assertEquals(expected, actual);
@@ -250,23 +663,23 @@ public abstract class TestInvokeHttpCommon {
public void test300() throws Exception {
addHandler(new GetOrHeadHandler());
- runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/302");
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/302");
createFlowFiles(runner);
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
// getMyFlowFiles();
// expected in response
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_NO_RETRY).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "302");
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "Found");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "302");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Found");
final String expected = "Hello";
Assert.assertEquals(expected, actual);
bundle.assertAttributeEquals("Foo", "Bar");
@@ -277,23 +690,23 @@ public abstract class TestInvokeHttpCommon {
public void test304() throws Exception {
addHandler(new GetOrHeadHandler());
- runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/304");
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/304");
createFlowFiles(runner);
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
// getMyFlowFiles();
// expected in response
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_NO_RETRY).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "304");
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "Not Modified");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "304");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Not Modified");
final String expected = "Hello";
Assert.assertEquals(expected, actual);
bundle.assertAttributeEquals("Foo", "Bar");
@@ -304,24 +717,24 @@ public abstract class TestInvokeHttpCommon {
public void test400() throws Exception {
addHandler(new GetOrHeadHandler());
- runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/400");
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/400");
createFlowFiles(runner);
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
// getMyFlowFiles();
// expected in response
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_NO_RETRY).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "400");
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "Bad Request");
- bundle.assertAttributeEquals(InvokeHTTP.Config.RESPONSE_BODY, "/status/400");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "400");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Bad Request");
+ bundle.assertAttributeEquals(InvokeHTTP.RESPONSE_BODY, "/status/400");
final String expected = "Hello";
Assert.assertEquals(expected, actual);
bundle.assertAttributeEquals("Foo", "Bar");
@@ -332,25 +745,25 @@ public abstract class TestInvokeHttpCommon {
public void test412() throws Exception {
addHandler(new GetOrHeadHandler());
- runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/412");
- runner.setProperty(InvokeHTTP.Config.PROP_METHOD, "GET");
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/412");
+ runner.setProperty(InvokeHTTP.PROP_METHOD, "GET");
createFlowFiles(runner);
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
// expected in response
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_NO_RETRY).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_NO_RETRY).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "412");
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "Precondition Failed");
- bundle.assertAttributeEquals(InvokeHTTP.Config.RESPONSE_BODY, "/status/412");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "412");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "Precondition Failed");
+ bundle.assertAttributeEquals(InvokeHTTP.RESPONSE_BODY, "/status/412");
final String expected = "Hello";
Assert.assertEquals(expected, actual);
bundle.assertAttributeEquals("Foo", "Bar");
@@ -361,28 +774,28 @@ public abstract class TestInvokeHttpCommon {
public void testHead() throws Exception {
addHandler(new GetOrHeadHandler());
- runner.setProperty(InvokeHTTP.Config.PROP_METHOD, "HEAD");
- runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_METHOD, "HEAD");
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
createFlowFiles(runner);
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0);
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
bundle1.assertContentEquals("".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
bundle1.assertAttributeEquals("Content-Type", "text/plain");
final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
@@ -394,28 +807,28 @@ public abstract class TestInvokeHttpCommon {
public void testPost() throws Exception {
addHandler(new PostHandler());
- runner.setProperty(InvokeHTTP.Config.PROP_METHOD, "POST");
- runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/post");
+ runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
createFlowFiles(runner);
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0);
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
bundle1.assertContentEquals("".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
bundle1.assertAttributeNotExists("Content-Type");
@@ -426,30 +839,30 @@ public abstract class TestInvokeHttpCommon {
@Test
public void testPut() throws Exception {
- addHandler(new PostHandler());
+ addHandler(new PutHandler());
- runner.setProperty(InvokeHTTP.Config.PROP_METHOD, "PUT");
- runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/post");
+ runner.setProperty(InvokeHTTP.PROP_METHOD, "PUT");
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
createFlowFiles(runner);
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0);
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
bundle1.assertContentEquals("".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
bundle1.assertAttributeNotExists("Content-Type");
@@ -462,28 +875,28 @@ public abstract class TestInvokeHttpCommon {
public void testDelete() throws Exception {
addHandler(new DeleteHandler());
- runner.setProperty(InvokeHTTP.Config.PROP_METHOD, "DELETE");
- runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_METHOD, "DELETE");
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
createFlowFiles(runner);
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0);
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
bundle1.assertContentEquals("".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
final String expected1 = "";
@@ -494,28 +907,28 @@ public abstract class TestInvokeHttpCommon {
public void testOptions() throws Exception {
addHandler(new OptionsHandler());
- runner.setProperty(InvokeHTTP.Config.PROP_METHOD, "OPTIONS");
- runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_METHOD, "OPTIONS");
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
createFlowFiles(runner);
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0);
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle1.assertAttributeEquals("Foo", "Bar");
}
@@ -523,34 +936,36 @@ public abstract class TestInvokeHttpCommon {
public void testSendAttributes() throws Exception {
addHandler(new AttributesSentHandler());
- runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.Config.PROP_ATTRIBUTES_TO_SEND, "Foo");
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_ATTRIBUTES_TO_SEND, "F.*");
+ runner.setProperty("dynamicHeader","yes!");
createFlowFiles(runner);
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 1);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
//expected in request status.code and status.message
//original flow file (+attributes)
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_REQ).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
bundle.assertContentEquals("Hello".getBytes("UTF-8"));
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
bundle.assertAttributeEquals("Foo", "Bar");
//expected in response
//status code, status message, all headers from server response --> ff attributes
//server response message body into payload of ff
- final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_SUCCESS_RESP).get(0);
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
bundle1.assertContentEquals("Bar".getBytes("UTF-8"));
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_CODE, "200");
- bundle1.assertAttributeEquals(InvokeHTTP.Config.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals("dynamicHeader","yes!");
bundle1.assertAttributeEquals("Foo", "Bar");
bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
}
@@ -559,19 +974,19 @@ public abstract class TestInvokeHttpCommon {
public void testReadTimeout() throws Exception {
addHandler(new ReadTimeoutHandler());
- runner.setProperty(InvokeHTTP.Config.PROP_URL, url + "/status/200");
- runner.setProperty(InvokeHTTP.Config.PROP_READ_TIMEOUT, "5 secs");
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_READ_TIMEOUT, "5 secs");
createFlowFiles(runner);
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1);
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_FAILURE).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
final String expected = "Hello";
@@ -586,17 +1001,17 @@ public abstract class TestInvokeHttpCommon {
// this is the bad urls
final String badurlport = "http://localhost:" + 445;
- runner.setProperty(InvokeHTTP.Config.PROP_URL, badurlport + "/doesnotExist");
+ runner.setProperty(InvokeHTTP.PROP_URL, badurlport + "/doesnotExist");
createFlowFiles(runner);
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1);
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_FAILURE).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
final String expected = "Hello";
@@ -610,17 +1025,17 @@ public abstract class TestInvokeHttpCommon {
final String badurlhost = "http://localhOOst:" + 445;
- runner.setProperty(InvokeHTTP.Config.PROP_URL, badurlhost + "/doesnotExist");
+ runner.setProperty(InvokeHTTP.PROP_URL, badurlhost + "/doesnotExist");
createFlowFiles(runner);
runner.run();
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_REQ, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_SUCCESS_RESP, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_NO_RETRY, 0);
- runner.assertTransferCount(InvokeHTTP.Config.REL_FAILURE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 1);
- final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.Config.REL_FAILURE).get(0);
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_FAILURE).get(0);
final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
final String expected = "Hello";
@@ -628,6 +1043,46 @@ public abstract class TestInvokeHttpCommon {
bundle.assertAttributeEquals("Foo", "Bar");
}
+ @Test
+ public void testArbitraryRequest() throws Exception {
+ addHandler(new FetchHandler());
+
+ runner.setProperty(InvokeHTTP.PROP_URL, url + "/status/200");
+ runner.setProperty(InvokeHTTP.PROP_METHOD,"FETCH");
+
+ createFlowFiles(runner);
+
+ runner.run();
+
+ runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
+ runner.assertTransferCount(InvokeHTTP.REL_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_NO_RETRY, 0);
+ runner.assertTransferCount(InvokeHTTP.REL_FAILURE, 0);
+
+ //expected in request status.code and status.message
+ //original flow file (+attributes)
+ final MockFlowFile bundle = runner.getFlowFilesForRelationship(InvokeHTTP.REL_SUCCESS_REQ).get(0);
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+ final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
+ final String expected = "Hello";
+ Assert.assertEquals(expected, actual);
+ bundle.assertAttributeEquals("Foo", "Bar");
+
+ //expected in response
+ //status code, status message, all headers from server response --> ff attributes
+ //server response message body into payload of ff
+ final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(InvokeHTTP.REL_RESPONSE).get(0);
+ bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_CODE, "200");
+ bundle1.assertAttributeEquals(InvokeHTTP.STATUS_MESSAGE, "OK");
+ bundle1.assertAttributeEquals("Foo", "Bar");
+ bundle1.assertAttributeEquals("Content-Type", "text/plain; charset=ISO-8859-1");
+ final String actual1 = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
+ final String expected1 = "/status/200";
+ Assert.assertEquals(expected1, actual1);
+ }
public static void createFlowFiles(final TestRunner testRunner) throws UnsupportedEncodingException {
final Map attributes = new HashMap<>();
@@ -638,7 +1093,7 @@ public abstract class TestInvokeHttpCommon {
}
- private static class DateHandler extends AbstractHandler {
+ protected static class DateHandler extends AbstractHandler {
private String dateString;
@@ -654,7 +1109,7 @@ public abstract class TestInvokeHttpCommon {
}
}
- private static class PostHandler extends AbstractHandler {
+ public static class PostHandler extends AbstractHandler {
@Override
public void handle(String target, Request baseRequest,
@@ -663,15 +1118,42 @@ public abstract class TestInvokeHttpCommon {
baseRequest.setHandled(true);
- assertEquals("/post", target);
-
- final String body = request.getReader().readLine();
- assertEquals("Hello", body);
+ if("POST".equals(request.getMethod())) {
+ assertEquals("application/plain-text",request.getHeader("Content-Type"));
+ final String body = request.getReader().readLine();
+ assertEquals("Hello", body);
+ } else {
+ response.setStatus(404);
+ response.setContentType("text/plain");
+ response.setContentLength(0);
+ }
}
}
- private static class GetOrHeadHandler extends AbstractHandler {
+ public static class PutHandler extends AbstractHandler {
+
+ @Override
+ public void handle(String target, Request baseRequest,
+ HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+
+ baseRequest.setHandled(true);
+
+ if("PUT".equalsIgnoreCase(request.getMethod())) {
+ assertEquals("application/plain-text",request.getHeader("Content-Type"));
+ final String body = request.getReader().readLine();
+ assertEquals("Hello", body);
+ } else {
+ response.setStatus(404);
+ response.setContentType("text/plain");
+ response.setContentLength(0);
+ }
+
+ }
+ }
+
+ public static class GetOrHeadHandler extends AbstractHandler {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
@@ -688,12 +1170,91 @@ public abstract class TestInvokeHttpCommon {
writer.print(target);
writer.flush();
}
+ } else if(!"HEAD".equalsIgnoreCase(request.getMethod())) {
+ response.setStatus(404);
+ response.setContentType("text/plain");
+ String body = "NO";
+ response.setContentLength(body.length());
+ response.setContentType("text/plain");
+
+ try (PrintWriter writer = response.getWriter()) {
+ writer.print(body);
+ writer.flush();
+ }
}
}
}
- private static class DeleteHandler extends AbstractHandler {
+ public static class GetLargeHandler extends AbstractHandler {
+
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
+ baseRequest.setHandled(true);
+
+ final int status = Integer.valueOf(target.substring("/status".length() + 1));
+ response.setStatus(status);
+
+ response.setContentType("text/plain");
+ response.setContentLength(target.length());
+
+ if ("GET".equalsIgnoreCase(request.getMethod())) {
+ try (PrintWriter writer = response.getWriter()) {
+ writer.print(target);
+ writer.flush();
+ }
+ } else {
+ response.setStatus(404);
+ response.setContentType("text/plain");
+
+ //Lorem Ipsum
+ String body = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. "
+ + "Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor "
+ + "in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, "
+ + "sunt in culpa qui officia deserunt mollit anim id est laborum.";
+
+ response.setContentLength(body.length());
+ response.setContentType("text/plain");
+
+ try (PrintWriter writer = response.getWriter()) {
+ writer.print(body);
+ writer.flush();
+ }
+ }
+
+ }
+ }
+
+ public static class GetMultipleHeaderHandler extends AbstractHandler {
+
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
+ baseRequest.setHandled(true);
+
+ final int status = Integer.valueOf(target.substring("/status".length() + 1));
+ response.setStatus(status);
+
+ response.setContentType("text/plain");
+ response.setContentLength(target.length());
+
+ if ("GET".equalsIgnoreCase(request.getMethod())) {
+ response.addHeader("double", "1");
+ response.addHeader("double", "2");
+
+ try (PrintWriter writer = response.getWriter()) {
+ writer.print(target);
+ writer.flush();
+ }
+ } else {
+ response.setStatus(404);
+ response.setContentType("text/plain");
+ response.setContentLength(0);
+ }
+
+ }
+ }
+
+ public static class DeleteHandler extends AbstractHandler {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
@@ -711,7 +1272,7 @@ public abstract class TestInvokeHttpCommon {
}
}
- private static class OptionsHandler extends AbstractHandler {
+ public static class OptionsHandler extends AbstractHandler {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
@@ -735,7 +1296,7 @@ public abstract class TestInvokeHttpCommon {
}
}
- private static class AttributesSentHandler extends AbstractHandler {
+ public static class AttributesSentHandler extends AbstractHandler {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
@@ -743,12 +1304,12 @@ public abstract class TestInvokeHttpCommon {
if ("Get".equalsIgnoreCase(request.getMethod())) {
String headerValue = request.getHeader("Foo");
+ response.setHeader("dynamicHeader",request.getHeader("dynamicHeader"));
final int status = Integer.valueOf(target.substring("/status".length() + 1));
response.setStatus(status);
response.setContentLength(headerValue.length());
response.setContentType("text/plain");
-
try (PrintWriter writer = response.getWriter()) {
writer.print(headerValue);
writer.flush();
@@ -761,7 +1322,7 @@ public abstract class TestInvokeHttpCommon {
}
}
- private static class ReadTimeoutHandler extends AbstractHandler {
+ public static class ReadTimeoutHandler extends AbstractHandler {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
@@ -792,9 +1353,7 @@ public abstract class TestInvokeHttpCommon {
}
}
-
-
- private static class BasicAuthHandler extends AbstractHandler {
+ public static class BasicAuthHandler extends AbstractHandler {
private String authString;
@@ -827,4 +1386,66 @@ public abstract class TestInvokeHttpCommon {
}
}
+ public static class DigestAuthHandler extends AbstractHandler {
+
+ private DigestAuthenticator digestAuthenticator;
+
+ private DigestAuthHandler() {
+ digestAuthenticator = new DigestAuthenticator();
+ ConstraintSecurityHandler securityHandler = new ConstraintSecurityHandler();
+
+ HashLoginService hashLoginService = new HashLoginService("realm");
+ hashLoginService.putUser("basic_user", new Password("basic_password"), new String[]{"realm"});
+ securityHandler.setLoginService(hashLoginService);
+ securityHandler.setIdentityService(new DefaultIdentityService());
+ digestAuthenticator.setConfiguration(securityHandler);
+ }
+
+
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse
+ response)throws IOException, ServletException {
+ baseRequest.setHandled(true);
+
+ try {
+ Authentication authentication = digestAuthenticator.validateRequest(request, response, true);
+
+ if (authentication instanceof Authentication.User) {
+ response.setContentType("text/plain");
+ Authentication.User user = (Authentication.User) authentication;
+ response.getWriter().println(user.getAuthMethod());
+ } else if (authentication instanceof Authentication.ResponseSent) {
+ Authentication.ResponseSent responseSent = (Authentication.ResponseSent) authentication;
+ }
+ } catch (ServerAuthException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public static class FetchHandler extends AbstractHandler {
+
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
+ baseRequest.setHandled(true);
+
+
+ if ("Fetch".equalsIgnoreCase(request.getMethod())) {
+ final int status = Integer.valueOf(target.substring("/status".length() + 1));
+ response.setStatus(status);
+ response.setContentType("text/plain");
+ response.setContentLength(target.length());
+
+ try (PrintWriter writer = response.getWriter()) {
+ writer.print(target);
+ writer.flush();
+ }
+ } else {
+
+ response.setStatus(404);
+ response.setContentType("text/plain");
+ response.setContentLength(target.length());
+ }
+ }
+ }
}
diff --git a/pom.xml b/pom.xml
index 8a8cdb09a7..070881223e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,6 +97,44 @@
12.0.1
2.2.0
+
+
+
+
+ central
+
+ Maven Repository
+ https://repo1.maven.org/maven2
+
+ true
+
+
+ false
+
+
+
+ apache-repo
+ Apache Repository
+ https://repository.apache.org/content/repositories/releases
+
+ true
+
+
+ false
+
+
+
+ jcenter
+ http://jcenter.bintray.com
+
+ false
+
+
+ true
+
+
+
+