From 317b2f4a4aae43a3312b704eebe78ece0bf448a5 Mon Sep 17 00:00:00 2001 From: Aldrin Piri Date: Thu, 7 Jan 2016 00:54:49 -0500 Subject: [PATCH] NIFI-1361 Exposing Content-Type as a processor property for both InvokeHTTP and PostHTTP. This makes use of the mime.type attribute via the EL expression ${mime.type}, making more apparent to the user how this header is derived and allowing the explicit setting of a value. --- .../nifi/processors/standard/InvokeHTTP.java | 27 ++- .../nifi/processors/standard/PostHTTP.java | 21 ++- .../processors/standard/CaptureServlet.java | 21 ++- .../processors/standard/TestPostHTTP.java | 54 ++++++ .../standard/util/TestInvokeHttpCommon.java | 154 ++++++++++++++---- 5 files changed, 232 insertions(+), 45 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java index 2eef3c6fff..4e5c22d4b9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java @@ -120,6 +120,8 @@ public final class InvokeHTTP extends AbstractProcessor { public final static String TRANSACTION_ID = "invokehttp.tx.id"; public final static String REMOTE_DN = "invokehttp.remote.dn"; + public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream"; + // Set of flowfile attributes which we generally always ignore during // processing, including when converting http headers, copying attributes, etc. // This set includes our strings defined above as well as some standard flowfile @@ -212,6 +214,16 @@ public final class InvokeHTTP extends AbstractProcessor { .addValidator(StandardValidators.PORT_VALIDATOR) .build(); + public static final PropertyDescriptor PROP_CONTENT_TYPE = new PropertyDescriptor.Builder() + .name("Content-Type") + .description("The Content-Type to specify for when content is being transmitted through a PUT or POST. " + + "In the case of an empty value after evaluating an expression language expression, Content-Type defaults to " + DEFAULT_CONTENT_TYPE) + .required(true) + .expressionLanguageSupported(true) + .defaultValue("${" + CoreAttributes.MIME_TYPE.key() + "}") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + // Per RFC 7235, 2617, and 2616. // basic-credentials = base64-user-pass // base64-user-pass = userid ":" password @@ -316,7 +328,8 @@ public final class InvokeHTTP extends AbstractProcessor { PROP_DIGEST_AUTH, PROP_OUTPUT_RESPONSE_REGARDLESS, PROP_TRUSTED_HOSTNAME, - PROP_ADD_HEADERS_TO_REQUEST)); + PROP_ADD_HEADERS_TO_REQUEST, + PROP_CONTENT_TYPE)); // relationships public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder() @@ -363,8 +376,6 @@ public final class InvokeHTTP extends AbstractProcessor { private final AtomicReference okHttpClientAtomicReference = new AtomicReference<>(); - public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream"; - @Override protected List getSupportedPropertyDescriptors() { return PROPERTIES; @@ -701,11 +712,11 @@ public final class InvokeHTTP extends AbstractProcessor { requestBuilder = requestBuilder.get(); break; case "POST": - RequestBody requestBody = getRequestBodyToSend(session, requestFlowFile); + RequestBody requestBody = getRequestBodyToSend(session, context, requestFlowFile); requestBuilder = requestBuilder.post(requestBody); break; case "PUT": - requestBody = getRequestBodyToSend(session, requestFlowFile); + requestBody = getRequestBodyToSend(session, context, requestFlowFile); requestBuilder = requestBuilder.put(requestBody); break; case "HEAD": @@ -723,12 +734,12 @@ public final class InvokeHTTP extends AbstractProcessor { return requestBuilder.build(); } - private RequestBody getRequestBodyToSend(final ProcessSession session, final FlowFile requestFlowFile) { + private RequestBody getRequestBodyToSend(final ProcessSession session, final ProcessContext context, final FlowFile requestFlowFile) { return new RequestBody() { @Override public MediaType contentType() { - final String attributeValue = requestFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); - String contentType = attributeValue == null ? DEFAULT_CONTENT_TYPE : attributeValue; + String contentType = context.getProperty(PROP_CONTENT_TYPE).evaluateAttributeExpressions(requestFlowFile).getValue(); + contentType = StringUtils.isBlank(contentType) ? DEFAULT_CONTENT_TYPE : contentType; return MediaType.parse(contentType); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java index c830ac0d9d..aacdb6ac0b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java @@ -85,7 +85,6 @@ import org.apache.http.protocol.HttpCoreContext; import org.apache.http.util.EntityUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -122,15 +121,15 @@ import org.apache.nifi.util.ObjectHolder; import org.apache.nifi.util.StopWatch; import com.sun.jersey.api.client.ClientResponse.Status; +import org.apache.nifi.util.StringUtils; @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"http", "https", "remote", "copy", "archive"}) @CapabilityDescription("Performs an HTTP Post with the content of the FlowFile") -@ReadsAttribute(attribute = "mime.type", description = "If not sending data as a FlowFile, the mime.type attribute will be used to set the HTTP Header for Content-Type") public class PostHTTP extends AbstractProcessor { - public static final String CONTENT_TYPE = "Content-Type"; + public static final String CONTENT_TYPE_HEADER = "Content-Type"; public static final String ACCEPT = "Accept"; public static final String ACCEPT_ENCODING = "Accept-Encoding"; public static final String APPLICATION_FLOW_FILE_V1 = "application/flowfile"; @@ -249,6 +248,15 @@ public class PostHTTP extends AbstractProcessor { .required(false) .addValidator(StandardValidators.PORT_VALIDATOR) .build(); + public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder() + .name("Content-Type") + .description("The Content-Type to specify for the content of the FlowFile being POSTed if " + SEND_AS_FLOWFILE.getName() + " is false. " + + "In the case of an empty value after evaluating an expression language expression, Content-Type defaults to " + DEFAULT_CONTENT_TYPE) + .required(true) + .expressionLanguageSupported(true) + .defaultValue("${" + CoreAttributes.MIME_TYPE.key() + "}") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -289,6 +297,7 @@ public class PostHTTP extends AbstractProcessor { properties.add(USER_AGENT); properties.add(PROXY_HOST); properties.add(PROXY_PORT); + properties.add(CONTENT_TYPE); this.properties = Collections.unmodifiableList(properties); } @@ -642,8 +651,8 @@ public class PostHTTP extends AbstractProcessor { return; } } else { - final String attributeValue = toSend.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()); - contentType = attributeValue == null ? DEFAULT_CONTENT_TYPE : attributeValue; + final String contentTypeValue = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(toSend.get(0)).getValue(); + contentType = StringUtils.isBlank(contentTypeValue) ? DEFAULT_CONTENT_TYPE : contentTypeValue; } final String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue(); @@ -659,7 +668,7 @@ public class PostHTTP extends AbstractProcessor { } } - post.setHeader(CONTENT_TYPE, contentType); + post.setHeader(CONTENT_TYPE_HEADER, contentType); post.setHeader(FLOWFILE_CONFIRMATION_HEADER, "true"); post.setHeader(PROTOCOL_VERSION_HEADER, PROTOCOL_VERSION); post.setHeader(TRANSACTION_ID_HEADER, transactionId); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java index 794d6c0b68..073ff525c2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java @@ -17,6 +17,9 @@ package org.apache.nifi.processors.standard; import java.io.IOException; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; @@ -33,18 +36,32 @@ public class CaptureServlet extends HttpServlet { private static final long serialVersionUID = 8402271018449653919L; private volatile byte[] lastPost; + private volatile Map lastPostHeaders; public byte[] getLastPost() { return lastPost; } + public Map getLastPostHeaders() { + return lastPostHeaders; + } + @Override protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try{ + + // Capture all the headers for reference. Intentionally choosing to not special handling for headers with multiple values for clarity + final Enumeration headerNames = request.getHeaderNames(); + lastPostHeaders = new HashMap<>(); + while (headerNames.hasMoreElements()) { + final String nextHeader = headerNames.nextElement(); + lastPostHeaders.put(nextHeader, request.getHeader(nextHeader)); + } + + try { StreamUtils.copy(request.getInputStream(), baos); this.lastPost = baos.toByteArray(); - } finally{ + } finally { FileUtils.closeQuietly(baos); } response.setStatus(Status.OK.getStatusCode()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java index 060148e183..274a9eda16 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java @@ -24,6 +24,7 @@ import java.io.ByteArrayOutputStream; import java.util.HashMap; import java.util.Map; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.StandardSSLContextService; import org.apache.nifi.util.FlowFileUnpackagerV3; @@ -32,6 +33,7 @@ import org.apache.nifi.util.TestRunners; import org.eclipse.jetty.servlet.ServletHandler; import org.junit.After; import org.junit.Test; +import org.junit.Assert; public class TestPostHTTP { @@ -245,4 +247,56 @@ public class TestPostHTTP { assertEquals("xyz.txt", receivedAttrs.get("filename")); } + @Test + public void testSendWithMimeType() throws Exception { + setup(null); + runner.setProperty(PostHTTP.URL, server.getUrl()); + + final Map attrs = new HashMap<>(); + + final String suppliedMimeType = "text/plain"; + attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType); + runner.enqueue("Camping is in tents.".getBytes(), attrs); + + runner.run(1); + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); + + Map lastPostHeaders = servlet.getLastPostHeaders(); + Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); + } + + @Test + public void testSendWithEmptyELExpression() throws Exception { + setup(null); + runner.setProperty(PostHTTP.URL, server.getUrl()); + + final Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.MIME_TYPE.key(), ""); + runner.enqueue("The wilderness downtown.".getBytes(), attrs); + + runner.run(1); + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); + + Map lastPostHeaders = servlet.getLastPostHeaders(); + Assert.assertEquals(PostHTTP.DEFAULT_CONTENT_TYPE, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); + } + + @Test + public void testSendWithContentTypeProperty() throws Exception { + setup(null); + + final String suppliedMimeType = "text/plain"; + runner.setProperty(PostHTTP.URL, server.getUrl()); + runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType); + + final Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv"); + runner.enqueue("Try this trick and spin it.".getBytes(), attrs); + + runner.run(1); + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); + + Map lastPostHeaders = servlet.getLastPostHeaders(); + Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java index b44f01522c..cf14e0854e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestInvokeHttpCommon.java @@ -805,7 +805,7 @@ public abstract class TestInvokeHttpCommon { @Test public void testPost() throws Exception { - addHandler(new PostHandler()); + addHandler(new MutativeMethodHandler(MutativeMethod.POST)); runner.setProperty(InvokeHTTP.PROP_METHOD, "POST"); runner.setProperty(InvokeHTTP.PROP_URL, url + "/post"); @@ -837,9 +837,113 @@ public abstract class TestInvokeHttpCommon { Assert.assertEquals(expected1, actual1); } + @Test + public void testPostWithMimeType() throws Exception { + final String suppliedMimeType = "text/plain"; + addHandler(new MutativeMethodHandler(MutativeMethod.POST, suppliedMimeType)); + + runner.setProperty(InvokeHTTP.PROP_METHOD, "POST"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/post"); + + final Map attrs = new HashMap<>(); + + attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType); + runner.enqueue("Hello".getBytes(), attrs); + + runner.run(1); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + } + + @Test + public void testPostWithEmptyELExpression() throws Exception { + addHandler(new MutativeMethodHandler(MutativeMethod.POST, InvokeHTTP.DEFAULT_CONTENT_TYPE)); + + runner.setProperty(InvokeHTTP.PROP_METHOD, "POST"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/post"); + + final Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.MIME_TYPE.key(), ""); + runner.enqueue("Hello".getBytes(), attrs); + + runner.run(1); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + } + + @Test + public void testPostWithContentTypeProperty() throws Exception { + final String suppliedMimeType = "text/plain"; + addHandler(new MutativeMethodHandler(MutativeMethod.POST, suppliedMimeType)); + + runner.setProperty(InvokeHTTP.PROP_METHOD, "POST"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/post"); + runner.setProperty(InvokeHTTP.PROP_CONTENT_TYPE, suppliedMimeType); + + final Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv"); + runner.enqueue("Hello".getBytes(), attrs); + + runner.run(1); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + } + + @Test + public void testPutWithMimeType() throws Exception { + final String suppliedMimeType = "text/plain"; + addHandler(new MutativeMethodHandler(MutativeMethod.PUT, suppliedMimeType)); + + runner.setProperty(InvokeHTTP.PROP_METHOD, "PUT"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/post"); + + final Map attrs = new HashMap<>(); + + attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType); + runner.enqueue("Hello".getBytes(), attrs); + + runner.run(1); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + } + + @Test + public void testPutWithEmptyELExpression() throws Exception { + addHandler(new MutativeMethodHandler(MutativeMethod.PUT, InvokeHTTP.DEFAULT_CONTENT_TYPE)); + + runner.setProperty(InvokeHTTP.PROP_METHOD, "PUT"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/post"); + + final Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.MIME_TYPE.key(), ""); + runner.enqueue("Hello".getBytes(), attrs); + + runner.run(1); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + } + + @Test + public void testPutWithContentTypeProperty() throws Exception { + final String suppliedMimeType = "text/plain"; + addHandler(new MutativeMethodHandler(MutativeMethod.PUT, suppliedMimeType)); + + runner.setProperty(InvokeHTTP.PROP_METHOD, "PUT"); + runner.setProperty(InvokeHTTP.PROP_URL, url + "/post"); + runner.setProperty(InvokeHTTP.PROP_CONTENT_TYPE, suppliedMimeType); + + final Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv"); + runner.enqueue("Hello".getBytes(), attrs); + + runner.run(1); + runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1); + runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); + } + @Test public void testPut() throws Exception { - addHandler(new PutHandler()); + addHandler(new MutativeMethodHandler(MutativeMethod.PUT)); runner.setProperty(InvokeHTTP.PROP_METHOD, "PUT"); runner.setProperty(InvokeHTTP.PROP_URL, url + "/post"); @@ -1109,17 +1213,30 @@ public abstract class TestInvokeHttpCommon { } } - public static class PostHandler extends AbstractHandler { + private enum MutativeMethod { POST, PUT } + + + public static class MutativeMethodHandler extends AbstractHandler { + private final MutativeMethod method; + private final String expectedContentType; + + public MutativeMethodHandler(final MutativeMethod method) { + this(method, "application/plain-text"); + } + + public MutativeMethodHandler(final MutativeMethod method, final String expectedContentType) { + this.method = method; + this.expectedContentType = expectedContentType; + } @Override - public void handle(String target, Request baseRequest, - HttpServletRequest request, HttpServletResponse response) - throws IOException, ServletException { + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { baseRequest.setHandled(true); - if("POST".equals(request.getMethod())) { - assertEquals("application/plain-text",request.getHeader("Content-Type")); + if(method.name().equals(request.getMethod())) { + assertEquals(this.expectedContentType,request.getHeader("Content-Type")); final String body = request.getReader().readLine(); assertEquals("Hello", body); } else { @@ -1129,28 +1246,7 @@ public abstract class TestInvokeHttpCommon { } } - } - public static class PutHandler extends AbstractHandler { - - @Override - public void handle(String target, Request baseRequest, - HttpServletRequest request, HttpServletResponse response) - throws IOException, ServletException { - - baseRequest.setHandled(true); - - if("PUT".equalsIgnoreCase(request.getMethod())) { - assertEquals("application/plain-text",request.getHeader("Content-Type")); - final String body = request.getReader().readLine(); - assertEquals("Hello", body); - } else { - response.setStatus(404); - response.setContentType("text/plain"); - response.setContentLength(0); - } - - } } public static class GetOrHeadHandler extends AbstractHandler {