NIFI-1620 Allow empty Content-Type in InvokeHTTP processor

This closes #272.

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
Pierre Villard 2016-03-12 17:13:04 +01:00 committed by Aldrin Piri
parent 9546bef86e
commit e6250d18cf
2 changed files with 63 additions and 19 deletions

View File

@ -221,9 +221,18 @@ public final class InvokeHTTP extends AbstractProcessor {
.required(true) .required(true)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.defaultValue("${" + CoreAttributes.MIME_TYPE.key() + "}") .defaultValue("${" + CoreAttributes.MIME_TYPE.key() + "}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.build(); .build();
public static final PropertyDescriptor PROP_SEND_BODY = new PropertyDescriptor.Builder()
.name("send-message-body")
.displayName("Send Message Body")
.description("If true, sends the HTTP message body on POST/PUT requests (default). If false, suppresses the message body and content-type header for these requests.")
.defaultValue("true")
.allowableValues("true", "false")
.required(false)
.build();
// Per RFC 7235, 2617, and 2616. // Per RFC 7235, 2617, and 2616.
// basic-credentials = base64-user-pass // basic-credentials = base64-user-pass
// base64-user-pass = userid ":" password // base64-user-pass = userid ":" password
@ -347,6 +356,7 @@ public final class InvokeHTTP extends AbstractProcessor {
PROP_TRUSTED_HOSTNAME, PROP_TRUSTED_HOSTNAME,
PROP_ADD_HEADERS_TO_REQUEST, PROP_ADD_HEADERS_TO_REQUEST,
PROP_CONTENT_TYPE, PROP_CONTENT_TYPE,
PROP_SEND_BODY,
PROP_USE_CHUNKED_ENCODING, PROP_USE_CHUNKED_ENCODING,
PROP_PENALIZE_NO_RETRY)); PROP_PENALIZE_NO_RETRY));
@ -761,24 +771,28 @@ public final class InvokeHTTP extends AbstractProcessor {
} }
private RequestBody getRequestBodyToSend(final ProcessSession session, final ProcessContext context, final FlowFile requestFlowFile) { private RequestBody getRequestBodyToSend(final ProcessSession session, final ProcessContext context, final FlowFile requestFlowFile) {
return new RequestBody() { if(context.getProperty(PROP_SEND_BODY).asBoolean()) {
@Override return new RequestBody() {
public MediaType contentType() { @Override
String contentType = context.getProperty(PROP_CONTENT_TYPE).evaluateAttributeExpressions(requestFlowFile).getValue(); public MediaType contentType() {
contentType = StringUtils.isBlank(contentType) ? DEFAULT_CONTENT_TYPE : contentType; String contentType = context.getProperty(PROP_CONTENT_TYPE).evaluateAttributeExpressions(requestFlowFile).getValue();
return MediaType.parse(contentType); contentType = StringUtils.isBlank(contentType) ? DEFAULT_CONTENT_TYPE : contentType;
} return MediaType.parse(contentType);
}
@Override @Override
public void writeTo(BufferedSink sink) throws IOException { public void writeTo(BufferedSink sink) throws IOException {
session.exportTo(requestFlowFile, sink.outputStream()); session.exportTo(requestFlowFile, sink.outputStream());
} }
@Override @Override
public long contentLength(){ public long contentLength(){
return useChunked ? -1 : requestFlowFile.getSize(); return useChunked ? -1 : requestFlowFile.getSize();
} }
}; };
} else {
return RequestBody.create(null, new byte[0]);
}
} }
private Request.Builder setHeaderProperties(final ProcessContext context, Request.Builder requestBuilder, final FlowFile requestFlowFile) { private Request.Builder setHeaderProperties(final ProcessContext context, Request.Builder requestBuilder, final FlowFile requestFlowFile) {

View File

@ -978,6 +978,26 @@ public abstract class TestInvokeHttpCommon {
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1); runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
} }
@Test
public void testPostWithEmptyBodySet() throws Exception {
final String suppliedMimeType = "";
addHandler(new MutativeMethodHandler(MutativeMethod.POST, suppliedMimeType));
runner.setNonLoopConnection(false);
runner.setProperty(InvokeHTTP.PROP_METHOD, "POST");
runner.setProperty(InvokeHTTP.PROP_URL, url + "/post");
runner.setProperty(InvokeHTTP.PROP_CONTENT_TYPE, suppliedMimeType);
runner.setProperty(InvokeHTTP.PROP_SEND_BODY, "false");
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType);
runner.enqueue("Hello".getBytes(), attrs);
runner.run(1);
runner.assertTransferCount(InvokeHTTP.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeHTTP.REL_RESPONSE, 1);
}
@Test @Test
public void testPutWithMimeType() throws Exception { public void testPutWithMimeType() throws Exception {
final String suppliedMimeType = "text/plain"; final String suppliedMimeType = "text/plain";
@ -1385,10 +1405,20 @@ public abstract class TestInvokeHttpCommon {
baseRequest.setHandled(true); baseRequest.setHandled(true);
if(method.name().equals(request.getMethod())) { if(method.name().equals(request.getMethod())) {
assertEquals(this.expectedContentType,request.getHeader("Content-Type")); if(this.expectedContentType.isEmpty()) {
Assert.assertNull(request.getHeader("Content-Type"));
} else {
assertEquals(this.expectedContentType,request.getHeader("Content-Type"));
}
final String body = request.getReader().readLine(); final String body = request.getReader().readLine();
this.trackedHeaderValue = baseRequest.getHttpFields().get(headerToTrack); this.trackedHeaderValue = baseRequest.getHttpFields().get(headerToTrack);
assertEquals("Hello", body);
if(this.expectedContentType.isEmpty()) {
Assert.assertNull(body);
} else {
assertEquals("Hello", body);
}
} else { } else {
response.setStatus(404); response.setStatus(404);
response.setContentType("text/plain"); response.setContentType("text/plain");