NIFI-1393 Providing the ability to send using gzip Content-Encoding in PostHTTP if the endpoint server supports it regardless if the processor is configured to send as a FlowFile This closes #175

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
Aldrin Piri 2016-01-14 13:24:20 -05:00 committed by Matt Gilman
parent 2c9fb676cd
commit 9ebcc9e4fa
3 changed files with 122 additions and 46 deletions

View File

@ -141,6 +141,8 @@ public class PostHTTP extends AbstractProcessor {
public static final String LOCATION_URI_INTENT_NAME = "x-location-uri-intent";
public static final String LOCATION_URI_INTENT_VALUE = "flowfile-hold";
public static final String GZIPPED_HEADER = "flowfile-gzipped";
public static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
public static final String CONTENT_ENCODING_GZIP_VALUE = "gzip";
public static final String PROTOCOL_VERSION_HEADER = "x-nifi-transfer-protocol-version";
public static final String TRANSACTION_ID_HEADER = "x-nifi-transaction-id";
@ -534,12 +536,7 @@ public class PostHTTP extends AbstractProcessor {
destinationAccepts = config.getDestinationAccepts();
if (destinationAccepts == null) {
try {
if (sendAsFlowFile) {
destinationAccepts = getDestinationAcceptance(client, url, getLogger(), transactionId);
} else {
destinationAccepts = new DestinationAccepts(false, false, false, false, null);
}
destinationAccepts = getDestinationAcceptance(sendAsFlowFile, client, url, getLogger(), transactionId);
config.setDestinationAccepts(destinationAccepts);
} catch (final IOException e) {
flowFile = session.penalize(flowFile);
@ -673,7 +670,11 @@ public class PostHTTP extends AbstractProcessor {
post.setHeader(PROTOCOL_VERSION_HEADER, PROTOCOL_VERSION);
post.setHeader(TRANSACTION_ID_HEADER, transactionId);
if (compressionLevel > 0 && accepts.isGzipAccepted()) {
if (sendAsFlowFile) {
post.setHeader(GZIPPED_HEADER, "true");
} else {
post.setHeader(CONTENT_ENCODING_HEADER, CONTENT_ENCODING_GZIP_VALUE);
}
}
// Do the actual POST
@ -841,23 +842,29 @@ public class PostHTTP extends AbstractProcessor {
}
}
private DestinationAccepts getDestinationAcceptance(final HttpClient client, final String uri, final ProcessorLog logger, final String transactionId) throws IOException {
private DestinationAccepts getDestinationAcceptance(final boolean sendAsFlowFile, final HttpClient client, final String uri,
final ProcessorLog logger, final String transactionId) throws IOException {
final HttpHead head = new HttpHead(uri);
if (sendAsFlowFile) {
head.addHeader(TRANSACTION_ID_HEADER, transactionId);
}
final HttpResponse response = client.execute(head);
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == Status.METHOD_NOT_ALLOWED.getStatusCode()) {
// we assume that the destination can support FlowFile v1 always.
return new DestinationAccepts(false, false, true, false, null);
} else if (statusCode == Status.OK.getStatusCode()) {
boolean acceptsFlowFileV3 = false;
// we assume that the destination can support FlowFile v1 always when the processor is also configured to send as a FlowFile
// otherwise, we do not bother to make any determinations concerning this compatibility
final boolean acceptsFlowFileV1 = sendAsFlowFile;
boolean acceptsFlowFileV2 = false;
boolean acceptsFlowFileV1 = true;
boolean acceptsFlowFileV3 = false;
boolean acceptsGzip = false;
Integer protocolVersion = null;
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == Status.METHOD_NOT_ALLOWED.getStatusCode()) {
return new DestinationAccepts(acceptsFlowFileV3, acceptsFlowFileV2, acceptsFlowFileV1, false, null);
} else if (statusCode == Status.OK.getStatusCode()) {
Header[] headers = response.getHeaders(ACCEPT);
// If configured to send as a flowfile, determine the capabilities of the endpoint
if (sendAsFlowFile) {
if (headers != null) {
for (final Header header : headers) {
for (final String accepted : header.getValue().split(",")) {
@ -866,12 +873,6 @@ public class PostHTTP extends AbstractProcessor {
acceptsFlowFileV3 = true;
} else if (trimmed.equals(APPLICATION_FLOW_FILE_V2)) {
acceptsFlowFileV2 = true;
} else {
// we assume that the destination accepts FlowFile V1 because legacy versions
// of NiFi that accepted V1 did not use an Accept header to indicate it... or
// any other header. So the bets thing we can do is just assume that V1 is
// accepted, if we're going to send as FlowFile.
acceptsFlowFileV1 = true;
}
}
}
@ -893,6 +894,7 @@ public class PostHTTP extends AbstractProcessor {
} else if (acceptsFlowFileV1) {
logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V1 + " if sending data as FlowFile");
}
}
headers = response.getHeaders(ACCEPT_ENCODING);
if (headers != null) {

View File

@ -71,6 +71,9 @@ public class CaptureServlet extends HttpServlet {
protected void doHead(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
response.setHeader("Accept", "application/flowfile-v3,application/flowfile-v2");
response.setHeader("x-nifi-transfer-protocol-version", "1");
// Unless an acceptGzip parameter is explicitly set to false, respond that this server accepts gzip
if (!Boolean.toString(false).equalsIgnoreCase(request.getParameter("acceptGzip"))) {
response.setHeader("Accept-Encoding", "gzip");
}
}
}

View File

@ -24,6 +24,7 @@ import java.io.ByteArrayOutputStream;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardSSLContextService;
@ -299,4 +300,74 @@ public class TestPostHTTP {
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
}
@Test
public void testSendWithCompressionServerAcceptGzip() throws Exception {
setup(null);
final String suppliedMimeType = "text/plain";
runner.setProperty(PostHTTP.URL, server.getUrl());
runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType);
runner.setProperty(PostHTTP.COMPRESSION_LEVEL, "9");
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
// Ensure that a 'Content-Encoding' header was set with a 'gzip' value
Assert.assertEquals(PostHTTP.CONTENT_ENCODING_GZIP_VALUE, lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER));
}
@Test
public void testSendWithoutCompressionServerAcceptGzip() throws Exception {
setup(null);
final String suppliedMimeType = "text/plain";
runner.setProperty(PostHTTP.URL, server.getUrl());
runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType);
runner.setProperty(PostHTTP.COMPRESSION_LEVEL, "0");
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
// Ensure that the request was not sent with a 'Content-Encoding' header
Assert.assertNull(lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER));
}
@Test
public void testSendWithCompressionServerNotAcceptGzip() throws Exception {
setup(null);
final String suppliedMimeType = "text/plain";
// Specify a property to the URL to have the CaptureServlet specify it doesn't accept gzip
runner.setProperty(PostHTTP.URL, server.getUrl()+"?acceptGzip=false");
runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType);
runner.setProperty(PostHTTP.COMPRESSION_LEVEL, "9");
final Map<String, String> attrs = new HashMap<>();
attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs);
runner.run(1);
runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS);
Map<String, String> lastPostHeaders = servlet.getLastPostHeaders();
Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER));
// Ensure that the request was not sent with a 'Content-Encoding' header
Assert.assertNull(lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER));
}
}