NIFI-1361 Exposing Content-Type as a processor property for both InvokeHTTP and PostHTTP. This makes use of the mime.type attribute via the EL expression ${mime.type}, making more apparent to the user how this header is derived and allowing the explicit setting of a value.

This commit is contained in:
Aldrin Piri 2016-01-07 00:54:49 -05:00
parent 6e9175459f
commit 317b2f4a4a
5 changed files with 232 additions and 45 deletions

View File

@ -120,6 +120,8 @@ public final class InvokeHTTP extends AbstractProcessor {
public final static String TRANSACTION_ID = "invokehttp.tx.id"; public final static String TRANSACTION_ID = "invokehttp.tx.id";
public final static String REMOTE_DN = "invokehttp.remote.dn"; public final static String REMOTE_DN = "invokehttp.remote.dn";
public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
// Set of flowfile attributes which we generally always ignore during // Set of flowfile attributes which we generally always ignore during
// processing, including when converting http headers, copying attributes, etc. // processing, including when converting http headers, copying attributes, etc.
// This set includes our strings defined above as well as some standard flowfile // This set includes our strings defined above as well as some standard flowfile
@ -212,6 +214,16 @@ public final class InvokeHTTP extends AbstractProcessor {
.addValidator(StandardValidators.PORT_VALIDATOR) .addValidator(StandardValidators.PORT_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor PROP_CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("Content-Type")
.description("The Content-Type to specify for when content is being transmitted through a PUT or POST. "
+ "In the case of an empty value after evaluating an expression language expression, Content-Type defaults to " + DEFAULT_CONTENT_TYPE)
.required(true)
.expressionLanguageSupported(true)
.defaultValue("${" + CoreAttributes.MIME_TYPE.key() + "}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
// Per RFC 7235, 2617, and 2616. // Per RFC 7235, 2617, and 2616.
// basic-credentials = base64-user-pass // basic-credentials = base64-user-pass
// base64-user-pass = userid ":" password // base64-user-pass = userid ":" password
@ -316,7 +328,8 @@ public final class InvokeHTTP extends AbstractProcessor {
PROP_DIGEST_AUTH, PROP_DIGEST_AUTH,
PROP_OUTPUT_RESPONSE_REGARDLESS, PROP_OUTPUT_RESPONSE_REGARDLESS,
PROP_TRUSTED_HOSTNAME, PROP_TRUSTED_HOSTNAME,
PROP_ADD_HEADERS_TO_REQUEST)); PROP_ADD_HEADERS_TO_REQUEST,
PROP_CONTENT_TYPE));
// relationships // relationships
public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder() public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
@ -363,8 +376,6 @@ public final class InvokeHTTP extends AbstractProcessor {
private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>(); private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES; return PROPERTIES;
@ -701,11 +712,11 @@ public final class InvokeHTTP extends AbstractProcessor {
requestBuilder = requestBuilder.get(); requestBuilder = requestBuilder.get();
break; break;
case "POST": case "POST":
RequestBody requestBody = getRequestBodyToSend(session, requestFlowFile); RequestBody requestBody = getRequestBodyToSend(session, context, requestFlowFile);
requestBuilder = requestBuilder.post(requestBody); requestBuilder = requestBuilder.post(requestBody);
break; break;
case "PUT": case "PUT":
requestBody = getRequestBodyToSend(session, requestFlowFile); requestBody = getRequestBodyToSend(session, context, requestFlowFile);
requestBuilder = requestBuilder.put(requestBody); requestBuilder = requestBuilder.put(requestBody);
break; break;
case "HEAD": case "HEAD":
@ -723,12 +734,12 @@ public final class InvokeHTTP extends AbstractProcessor {
return requestBuilder.build(); return requestBuilder.build();
} }
private RequestBody getRequestBodyToSend(final ProcessSession session, final FlowFile requestFlowFile) { private RequestBody getRequestBodyToSend(final ProcessSession session, final ProcessContext context, final FlowFile requestFlowFile) {
return new RequestBody() { return new RequestBody() {
@Override @Override
public MediaType contentType() { public MediaType contentType() {
final String attributeValue = requestFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); String contentType = context.getProperty(PROP_CONTENT_TYPE).evaluateAttributeExpressions(requestFlowFile).getValue();
String contentType = attributeValue == null ? DEFAULT_CONTENT_TYPE : attributeValue; contentType = StringUtils.isBlank(contentType) ? DEFAULT_CONTENT_TYPE : contentType;
return MediaType.parse(contentType); return MediaType.parse(contentType);
} }

View File

@ -85,7 +85,6 @@ import org.apache.http.protocol.HttpCoreContext;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
@ -122,15 +121,15 @@ import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import com.sun.jersey.api.client.ClientResponse.Status; import com.sun.jersey.api.client.ClientResponse.Status;
import org.apache.nifi.util.StringUtils;
@SupportsBatching @SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"http", "https", "remote", "copy", "archive"}) @Tags({"http", "https", "remote", "copy", "archive"})
@CapabilityDescription("Performs an HTTP Post with the content of the FlowFile") @CapabilityDescription("Performs an HTTP Post with the content of the FlowFile")
@ReadsAttribute(attribute = "mime.type", description = "If not sending data as a FlowFile, the mime.type attribute will be used to set the HTTP Header for Content-Type")
public class PostHTTP extends AbstractProcessor { public class PostHTTP extends AbstractProcessor {
public static final String CONTENT_TYPE = "Content-Type"; public static final String CONTENT_TYPE_HEADER = "Content-Type";
public static final String ACCEPT = "Accept"; public static final String ACCEPT = "Accept";
public static final String ACCEPT_ENCODING = "Accept-Encoding"; public static final String ACCEPT_ENCODING = "Accept-Encoding";
public static final String APPLICATION_FLOW_FILE_V1 = "application/flowfile"; public static final String APPLICATION_FLOW_FILE_V1 = "application/flowfile";
@ -249,6 +248,15 @@ public class PostHTTP extends AbstractProcessor {
.required(false) .required(false)
.addValidator(StandardValidators.PORT_VALIDATOR) .addValidator(StandardValidators.PORT_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("Content-Type")
.description("The Content-Type to specify for the content of the FlowFile being POSTed if " + SEND_AS_FLOWFILE.getName() + " is false. "
+ "In the case of an empty value after evaluating an expression language expression, Content-Type defaults to " + DEFAULT_CONTENT_TYPE)
.required(true)
.expressionLanguageSupported(true)
.defaultValue("${" + CoreAttributes.MIME_TYPE.key() + "}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
@ -289,6 +297,7 @@ public class PostHTTP extends AbstractProcessor {
properties.add(USER_AGENT); properties.add(USER_AGENT);
properties.add(PROXY_HOST); properties.add(PROXY_HOST);
properties.add(PROXY_PORT); properties.add(PROXY_PORT);
properties.add(CONTENT_TYPE);
this.properties = Collections.unmodifiableList(properties); this.properties = Collections.unmodifiableList(properties);
} }
@ -642,8 +651,8 @@ public class PostHTTP extends AbstractProcessor {
return; return;
} }
} else { } else {
final String attributeValue = toSend.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()); final String contentTypeValue = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(toSend.get(0)).getValue();
contentType = attributeValue == null ? DEFAULT_CONTENT_TYPE : attributeValue; contentType = StringUtils.isBlank(contentTypeValue) ? DEFAULT_CONTENT_TYPE : contentTypeValue;
} }
final String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue(); final String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue();
@ -659,7 +668,7 @@ public class PostHTTP extends AbstractProcessor {
} }
} }
post.setHeader(CONTENT_TYPE, contentType); post.setHeader(CONTENT_TYPE_HEADER, contentType);
post.setHeader(FLOWFILE_CONFIRMATION_HEADER, "true"); post.setHeader(FLOWFILE_CONFIRMATION_HEADER, "true");
post.setHeader(PROTOCOL_VERSION_HEADER, PROTOCOL_VERSION); post.setHeader(PROTOCOL_VERSION_HEADER, PROTOCOL_VERSION);
post.setHeader(TRANSACTION_ID_HEADER, transactionId); post.setHeader(TRANSACTION_ID_HEADER, transactionId);

View File

@ -17,6 +17,9 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.IOException; import java.io.IOException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServlet;
@ -33,14 +36,28 @@ public class CaptureServlet extends HttpServlet {
private static final long serialVersionUID = 8402271018449653919L; private static final long serialVersionUID = 8402271018449653919L;
private volatile byte[] lastPost; private volatile byte[] lastPost;
private volatile Map<String, String> lastPostHeaders;
public byte[] getLastPost() { public byte[] getLastPost() {
return lastPost; return lastPost;
} }
public Map<String, String> getLastPostHeaders() {
return lastPostHeaders;
}
@Override @Override
protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Capture all the headers for reference. Intentionally choosing to not special handling for headers with multiple values for clarity
final Enumeration<String> headerNames = request.getHeaderNames();
lastPostHeaders = new HashMap<>();
while (headerNames.hasMoreElements()) {
final String nextHeader = headerNames.nextElement();
lastPostHeaders.put(nextHeader, request.getHeader(nextHeader));
}
try { try {
StreamUtils.copy(request.getInputStream(), baos); StreamUtils.copy(request.getInputStream(), baos);
this.lastPost = baos.toByteArray(); this.lastPost = baos.toByteArray();

View File

@ -24,6 +24,7 @@ import java.io.ByteArrayOutputStream;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardSSLContextService; import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.FlowFileUnpackagerV3; import org.apache.nifi.util.FlowFileUnpackagerV3;
@ -32,6 +33,7 @@ import org.apache.nifi.util.TestRunners;
import org.eclipse.jetty.servlet.ServletHandler; import org.eclipse.jetty.servlet.ServletHandler;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
import org.junit.Assert;
public class TestPostHTTP { public class TestPostHTTP {
@ -245,4 +247,56 @@ public class TestPostHTTP {
assertEquals("xyz.txt", receivedAttrs.get("filename")); assertEquals("xyz.txt", receivedAttrs.get("filename"));
} }
@Test
public void testSendWithMimeType() throws Exception {
setup(null);
runner.setProperty(PostHTTP.URL, server.getUrl());
final Map<String, String> attrs = new HashMap<>();
final String suppliedMimeType = "text/plain";
attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType);
runner.enqueue("Camping is in tents.".getBytes(), attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
}
@Test
public void testSendWithEmptyELExpression() throws Exception {
setup(null);
runner.setProperty(PostHTTP.URL, server.getUrl());
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "");
runner.enqueue("The wilderness downtown.".getBytes(), attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(PostHTTP.DEFAULT_CONTENT_TYPE, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
}
@Test
public void testSendWithContentTypeProperty() throws Exception {
setup(null);
final String suppliedMimeType = "text/plain";
runner.setProperty(PostHTTP.URL, server.getUrl());
runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType);
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
runner.enqueue("Try this trick and spin it.".getBytes(), attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
}
} }

View File

@ -805,7 +805,7 @@ public abstract class TestInvokeHttpCommon {
@Test @Test
public void testPost() throws Exception { public void testPost() throws Exception {
addHandler(new PostHandler()); addHandler(new MutativeMethodHandler(MutativeMethod.POST));
runner.setProperty(InvokeHTTP.PROP_METHOD, "POST"); runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
runner.setProperty(InvokeHTTP.PROP_URL, url + "/post"); runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
@ -837,9 +837,113 @@ public abstract class TestInvokeHttpCommon {
Assert.assertEquals(expected1, actual1); Assert.assertEquals(expected1, actual1);
} }
@Test
public void testPostWithMimeType() throws Exception {
final String suppliedMimeType = "text/plain";
addHandler(new MutativeMethodHandler(MutativeMethod.POST, suppliedMimeType));
runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType);
runner.enqueue("Hello".getBytes(), attrs);
runner.run(1);
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
}
@Test
public void testPostWithEmptyELExpression() throws Exception {
addHandler(new MutativeMethodHandler(MutativeMethod.POST, InvokeHTTP.DEFAULT_CONTENT_TYPE));
runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "");
runner.enqueue("Hello".getBytes(), attrs);
runner.run(1);
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
}
@Test
public void testPostWithContentTypeProperty() throws Exception {
final String suppliedMimeType = "text/plain";
addHandler(new MutativeMethodHandler(MutativeMethod.POST, suppliedMimeType));
runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
runner.setProperty(InvokeHTTP.PROP_CONTENT_TYPE, suppliedMimeType);
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
runner.enqueue("Hello".getBytes(), attrs);
runner.run(1);
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
}
@Test
public void testPutWithMimeType() throws Exception {
final String suppliedMimeType = "text/plain";
addHandler(new MutativeMethodHandler(MutativeMethod.PUT, suppliedMimeType));
runner.setProperty(InvokeHTTP.PROP_METHOD, "PUT");
runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType);
runner.enqueue("Hello".getBytes(), attrs);
runner.run(1);
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
}
@Test
public void testPutWithEmptyELExpression() throws Exception {
addHandler(new MutativeMethodHandler(MutativeMethod.PUT, InvokeHTTP.DEFAULT_CONTENT_TYPE));
runner.setProperty(InvokeHTTP.PROP_METHOD, "PUT");
runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "");
runner.enqueue("Hello".getBytes(), attrs);
runner.run(1);
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
}
@Test
public void testPutWithContentTypeProperty() throws Exception {
final String suppliedMimeType = "text/plain";
addHandler(new MutativeMethodHandler(MutativeMethod.PUT, suppliedMimeType));
runner.setProperty(InvokeHTTP.PROP_METHOD, "PUT");
runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
runner.setProperty(InvokeHTTP.PROP_CONTENT_TYPE, suppliedMimeType);
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
runner.enqueue("Hello".getBytes(), attrs);
runner.run(1);
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
}
@Test @Test
public void testPut() throws Exception { public void testPut() throws Exception {
addHandler(new PutHandler()); addHandler(new MutativeMethodHandler(MutativeMethod.PUT));
runner.setProperty(InvokeHTTP.PROP_METHOD, "PUT"); runner.setProperty(InvokeHTTP.PROP_METHOD, "PUT");
runner.setProperty(InvokeHTTP.PROP_URL, url + "/post"); runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
@ -1109,17 +1213,30 @@ public abstract class TestInvokeHttpCommon {
} }
} }
public static class PostHandler extends AbstractHandler { private enum MutativeMethod { POST, PUT }
public static class MutativeMethodHandler extends AbstractHandler {
private final MutativeMethod method;
private final String expectedContentType;
public MutativeMethodHandler(final MutativeMethod method) {
this(method, "application/plain-text");
}
public MutativeMethodHandler(final MutativeMethod method, final String expectedContentType) {
this.method = method;
this.expectedContentType = expectedContentType;
}
@Override @Override
public void handle(String target, Request baseRequest, public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException { throws IOException, ServletException {
baseRequest.setHandled(true); baseRequest.setHandled(true);
if("POST".equals(request.getMethod())) { if(method.name().equals(request.getMethod())) {
assertEquals("application/plain-text",request.getHeader("Content-Type")); assertEquals(this.expectedContentType,request.getHeader("Content-Type"));
final String body = request.getReader().readLine(); final String body = request.getReader().readLine();
assertEquals("Hello", body); assertEquals("Hello", body);
} else { } else {
@ -1129,28 +1246,7 @@ public abstract class TestInvokeHttpCommon {
} }
} }
}
public static class PutHandler extends AbstractHandler {
@Override
public void handle(String target, Request baseRequest,
HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
baseRequest.setHandled(true);
if("PUT".equalsIgnoreCase(request.getMethod())) {
assertEquals("application/plain-text",request.getHeader("Content-Type"));
final String body = request.getReader().readLine();
assertEquals("Hello", body);
} else {
response.setStatus(404);
response.setContentType("text/plain");
response.setContentLength(0);
}
}
} }
public static class GetOrHeadHandler extends AbstractHandler { public static class GetOrHeadHandler extends AbstractHandler {