NIFI-1396 Correcting PostHttp chunked encoding functionality

This commit is contained in:
jpercivall 2016-01-25 16:57:17 -05:00
parent b3990ecdcf
commit ee30b27cc8
2 changed files with 62 additions and 5 deletions

View File

@ -227,10 +227,9 @@ public class PostHTTP extends AbstractProcessor {
.build(); .build();
public static final PropertyDescriptor CHUNKED_ENCODING = new PropertyDescriptor.Builder() public static final PropertyDescriptor CHUNKED_ENCODING = new PropertyDescriptor.Builder()
.name("Use Chunked Encoding") .name("Use Chunked Encoding")
.description("Specifies whether or not to use Chunked Encoding to send the data. If false, the entire content of the FlowFile will be buffered into memory.") .description("Specifies whether or not to use Chunked Encoding to send the data. This property is ignored in the event the contents are compressed "
.required(true) + "or sent as FlowFiles.")
.allowableValues("true", "false") .allowableValues("true", "false")
.defaultValue("true")
.build(); .build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service") .name("SSL Context Service")
@ -331,6 +330,15 @@ public class PostHTTP extends AbstractProcessor {
.build()); .build());
} }
boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean();
int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
boolean chunkedSet = context.getProperty(CHUNKED_ENCODING).isSet();
if(compressionLevel == 0 && !sendAsFlowFile && !chunkedSet) {
results.add(new ValidationResult.Builder().valid(false).subject(CHUNKED_ENCODING.getName())
.explanation("if compression level is 0 and not sending as a FlowFile, then the \'"+CHUNKED_ENCODING.getName()+"\' property must be set").build());
}
return results; return results;
} }
@ -625,9 +633,21 @@ public class PostHTTP extends AbstractProcessor {
out.flush(); out.flush();
} }
} }
}); }) {
entity.setChunked(context.getProperty(CHUNKED_ENCODING).asBoolean()); @Override
public long getContentLength() {
if(compressionLevel == 0 && !sendAsFlowFile && !context.getProperty(CHUNKED_ENCODING).asBoolean() ) {
return toSend.get(0).getSize();
} else {
return -1;
}
}
};
if(context.getProperty(CHUNKED_ENCODING).isSet()) {
entity.setChunked(context.getProperty(CHUNKED_ENCODING).asBoolean());
}
post.setEntity(entity); post.setEntity(entity);
post.setConfig(requestConfig); post.setConfig(requestConfig);

View File

@ -82,6 +82,7 @@ public class TestPostHTTP {
runner.setProperty(PostHTTP.URL, server.getSecureUrl()); runner.setProperty(PostHTTP.URL, server.getSecureUrl());
runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context"); runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(PostHTTP.CHUNKED_ENCODING, "false");
runner.enqueue("Hello world".getBytes()); runner.enqueue("Hello world".getBytes());
runner.run(); runner.run();
@ -113,6 +114,7 @@ public class TestPostHTTP {
runner.setProperty(PostHTTP.URL, server.getSecureUrl()); runner.setProperty(PostHTTP.URL, server.getSecureUrl());
runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context"); runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(PostHTTP.CHUNKED_ENCODING, "false");
runner.enqueue("Hello world".getBytes()); runner.enqueue("Hello world".getBytes());
runner.run(); runner.run();
@ -141,6 +143,7 @@ public class TestPostHTTP {
runner.setProperty(PostHTTP.URL, server.getSecureUrl()); runner.setProperty(PostHTTP.URL, server.getSecureUrl());
runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context"); runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(PostHTTP.CHUNKED_ENCODING, "false");
runner.enqueue("Hello world".getBytes()); runner.enqueue("Hello world".getBytes());
runner.run(); runner.run();
@ -186,6 +189,7 @@ public class TestPostHTTP {
assertEquals("World", new String(contentReceived)); assertEquals("World", new String(contentReceived));
assertEquals("abc", receivedAttrs.get("abc")); assertEquals("abc", receivedAttrs.get("abc"));
assertEquals("xyz.txt", receivedAttrs.get("filename")); assertEquals("xyz.txt", receivedAttrs.get("filename"));
Assert.assertNull(receivedAttrs.get("Content-Length"));
} }
@Test @Test
@ -258,18 +262,21 @@ public class TestPostHTTP {
final String suppliedMimeType = "text/plain"; final String suppliedMimeType = "text/plain";
attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType); attrs.put(CoreAttributes.MIME_TYPE.key(), suppliedMimeType);
runner.enqueue("Camping is in tents.".getBytes(), attrs); runner.enqueue("Camping is in tents.".getBytes(), attrs);
runner.setProperty(PostHTTP.CHUNKED_ENCODING, "false");
runner.run(1); runner.run(1);
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders(); Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
Assert.assertEquals("20",lastPostHeaders.get("Content-Length"));
} }
@Test @Test
public void testSendWithEmptyELExpression() throws Exception { public void testSendWithEmptyELExpression() throws Exception {
setup(null); setup(null);
runner.setProperty(PostHTTP.URL, server.getUrl()); runner.setProperty(PostHTTP.URL, server.getUrl());
runner.setProperty(PostHTTP.CHUNKED_ENCODING, "false");
final Map<String, String> attrs = new HashMap<>(); final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), ""); attrs.put(CoreAttributes.MIME_TYPE.key(), "");
@ -289,6 +296,7 @@ public class TestPostHTTP {
final String suppliedMimeType = "text/plain"; final String suppliedMimeType = "text/plain";
runner.setProperty(PostHTTP.URL, server.getUrl()); runner.setProperty(PostHTTP.URL, server.getUrl());
runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType); runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType);
runner.setProperty(PostHTTP.CHUNKED_ENCODING, "false");
final Map<String, String> attrs = new HashMap<>(); final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv"); attrs.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
@ -322,6 +330,7 @@ public class TestPostHTTP {
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
// Ensure that a 'Content-Encoding' header was set with a 'gzip' value // Ensure that a 'Content-Encoding' header was set with a 'gzip' value
Assert.assertEquals(PostHTTP.CONTENT_ENCODING_GZIP_VALUE, lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER)); Assert.assertEquals(PostHTTP.CONTENT_ENCODING_GZIP_VALUE, lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER));
Assert.assertNull(lastPostHeaders.get("Content-Length"));
} }
@Test @Test
@ -332,6 +341,7 @@ public class TestPostHTTP {
runner.setProperty(PostHTTP.URL, server.getUrl()); runner.setProperty(PostHTTP.URL, server.getUrl());
runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType); runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType);
runner.setProperty(PostHTTP.COMPRESSION_LEVEL, "0"); runner.setProperty(PostHTTP.COMPRESSION_LEVEL, "0");
runner.setProperty(PostHTTP.CHUNKED_ENCODING, "false");
final Map<String, String> attrs = new HashMap<>(); final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
@ -345,6 +355,7 @@ public class TestPostHTTP {
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
// Ensure that the request was not sent with a 'Content-Encoding' header // Ensure that the request was not sent with a 'Content-Encoding' header
Assert.assertNull(lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER)); Assert.assertNull(lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER));
Assert.assertEquals("6200",lastPostHeaders.get("Content-Length"));
} }
@Test @Test
@ -370,4 +381,30 @@ public class TestPostHTTP {
// Ensure that the request was not sent with a 'Content-Encoding' header // Ensure that the request was not sent with a 'Content-Encoding' header
Assert.assertNull(lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER)); Assert.assertNull(lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER));
} }
@Test
public void testSendChunked() throws Exception {
setup(null);
final String suppliedMimeType = "text/plain";
runner.setProperty(PostHTTP.URL, server.getUrl());
runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType);
runner.setProperty(PostHTTP.CHUNKED_ENCODING, "true");
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
byte[] postValue = servlet.getLastPost();
Assert.assertArrayEquals(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(),postValue);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
Assert.assertNull(lastPostHeaders.get("Content-Length"));
Assert.assertEquals("chunked",lastPostHeaders.get("Transfer-Encoding"));
}
} }