From 9ebcc9e4fa14d8ae9e6b312865f11c4661bc1a68 Mon Sep 17 00:00:00 2001 From: Aldrin Piri Date: Thu, 14 Jan 2016 13:24:20 -0500 Subject: [PATCH] NIFI-1393 Providing the ability to send using gzip Content-Encoding in PostHTTP if the endpoint server supports it regardless if the processor is configured to send as a FlowFile This closes #175 Signed-off-by: Matt Gilman --- .../nifi/processors/standard/PostHTTP.java | 92 ++++++++++--------- .../processors/standard/CaptureServlet.java | 5 +- .../processors/standard/TestPostHTTP.java | 71 ++++++++++++++ 3 files changed, 122 insertions(+), 46 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java index aacdb6ac0b..4aba8fe763 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java @@ -141,6 +141,8 @@ public class PostHTTP extends AbstractProcessor { public static final String LOCATION_URI_INTENT_NAME = "x-location-uri-intent"; public static final String LOCATION_URI_INTENT_VALUE = "flowfile-hold"; public static final String GZIPPED_HEADER = "flowfile-gzipped"; + public static final String CONTENT_ENCODING_HEADER = "Content-Encoding"; + public static final String CONTENT_ENCODING_GZIP_VALUE = "gzip"; public static final String PROTOCOL_VERSION_HEADER = "x-nifi-transfer-protocol-version"; public static final String TRANSACTION_ID_HEADER = "x-nifi-transaction-id"; @@ -534,12 +536,7 @@ public class PostHTTP extends AbstractProcessor { destinationAccepts = config.getDestinationAccepts(); if (destinationAccepts == null) { try { - if (sendAsFlowFile) { - destinationAccepts = getDestinationAcceptance(client, url, getLogger(), transactionId); - } else { - destinationAccepts = new DestinationAccepts(false, false, false, false, null); - } - + destinationAccepts = getDestinationAcceptance(sendAsFlowFile, client, url, getLogger(), transactionId); config.setDestinationAccepts(destinationAccepts); } catch (final IOException e) { flowFile = session.penalize(flowFile); @@ -673,7 +670,11 @@ public class PostHTTP extends AbstractProcessor { post.setHeader(PROTOCOL_VERSION_HEADER, PROTOCOL_VERSION); post.setHeader(TRANSACTION_ID_HEADER, transactionId); if (compressionLevel > 0 && accepts.isGzipAccepted()) { - post.setHeader(GZIPPED_HEADER, "true"); + if (sendAsFlowFile) { + post.setHeader(GZIPPED_HEADER, "true"); + } else { + post.setHeader(CONTENT_ENCODING_HEADER, CONTENT_ENCODING_GZIP_VALUE); + } } // Do the actual POST @@ -841,57 +842,58 @@ public class PostHTTP extends AbstractProcessor { } } - private DestinationAccepts getDestinationAcceptance(final HttpClient client, final String uri, final ProcessorLog logger, final String transactionId) throws IOException { + private DestinationAccepts getDestinationAcceptance(final boolean sendAsFlowFile, final HttpClient client, final String uri, + final ProcessorLog logger, final String transactionId) throws IOException { final HttpHead head = new HttpHead(uri); - head.addHeader(TRANSACTION_ID_HEADER, transactionId); + if (sendAsFlowFile) { + head.addHeader(TRANSACTION_ID_HEADER, transactionId); + } final HttpResponse response = client.execute(head); + // we assume that the destination can support FlowFile v1 always when the processor is also configured to send as a FlowFile + // otherwise, we do not bother to make any determinations concerning this compatibility + final boolean acceptsFlowFileV1 = sendAsFlowFile; + boolean acceptsFlowFileV2 = false; + boolean acceptsFlowFileV3 = false; + boolean acceptsGzip = false; + Integer protocolVersion = null; + final int statusCode = response.getStatusLine().getStatusCode(); if (statusCode == Status.METHOD_NOT_ALLOWED.getStatusCode()) { - // we assume that the destination can support FlowFile v1 always. - return new DestinationAccepts(false, false, true, false, null); + return new DestinationAccepts(acceptsFlowFileV3, acceptsFlowFileV2, acceptsFlowFileV1, false, null); } else if (statusCode == Status.OK.getStatusCode()) { - boolean acceptsFlowFileV3 = false; - boolean acceptsFlowFileV2 = false; - boolean acceptsFlowFileV1 = true; - boolean acceptsGzip = false; - Integer protocolVersion = null; - Header[] headers = response.getHeaders(ACCEPT); - if (headers != null) { - for (final Header header : headers) { - for (final String accepted : header.getValue().split(",")) { - final String trimmed = accepted.trim(); - if (trimmed.equals(APPLICATION_FLOW_FILE_V3)) { - acceptsFlowFileV3 = true; - } else if (trimmed.equals(APPLICATION_FLOW_FILE_V2)) { - acceptsFlowFileV2 = true; - } else { - // we assume that the destination accepts FlowFile V1 because legacy versions - // of NiFi that accepted V1 did not use an Accept header to indicate it... or - // any other header. So the bets thing we can do is just assume that V1 is - // accepted, if we're going to send as FlowFile. - acceptsFlowFileV1 = true; + // If configured to send as a flowfile, determine the capabilities of the endpoint + if (sendAsFlowFile) { + if (headers != null) { + for (final Header header : headers) { + for (final String accepted : header.getValue().split(",")) { + final String trimmed = accepted.trim(); + if (trimmed.equals(APPLICATION_FLOW_FILE_V3)) { + acceptsFlowFileV3 = true; + } else if (trimmed.equals(APPLICATION_FLOW_FILE_V2)) { + acceptsFlowFileV2 = true; + } } } } - } - final Header destinationVersion = response.getFirstHeader(PROTOCOL_VERSION_HEADER); - if (destinationVersion != null) { - try { - protocolVersion = Integer.valueOf(destinationVersion.getValue()); - } catch (final NumberFormatException e) { - // nothing to do here really.... it's an invalid value, so treat the same as if not specified + final Header destinationVersion = response.getFirstHeader(PROTOCOL_VERSION_HEADER); + if (destinationVersion != null) { + try { + protocolVersion = Integer.valueOf(destinationVersion.getValue()); + } catch (final NumberFormatException e) { + // nothing to do here really.... it's an invalid value, so treat the same as if not specified + } } - } - if (acceptsFlowFileV3) { - logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V3 + " if sending data as FlowFile"); - } else if (acceptsFlowFileV2) { - logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V2 + " if sending data as FlowFile"); - } else if (acceptsFlowFileV1) { - logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V1 + " if sending data as FlowFile"); + if (acceptsFlowFileV3) { + logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V3 + " if sending data as FlowFile"); + } else if (acceptsFlowFileV2) { + logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V2 + " if sending data as FlowFile"); + } else if (acceptsFlowFileV1) { + logger.debug("Connection to URI " + uri + " will be using Content Type " + APPLICATION_FLOW_FILE_V1 + " if sending data as FlowFile"); + } } headers = response.getHeaders(ACCEPT_ENCODING); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java index 073ff525c2..b58e53238e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java @@ -71,6 +71,9 @@ public class CaptureServlet extends HttpServlet { protected void doHead(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { response.setHeader("Accept", "application/flowfile-v3,application/flowfile-v2"); response.setHeader("x-nifi-transfer-protocol-version", "1"); - response.setHeader("Accept-Encoding", "gzip"); + // Unless an acceptGzip parameter is explicitly set to false, respond that this server accepts gzip + if (!Boolean.toString(false).equalsIgnoreCase(request.getParameter("acceptGzip"))) { + response.setHeader("Accept-Encoding", "gzip"); + } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java index 274a9eda16..67bd82c731 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java @@ -24,6 +24,7 @@ import java.io.ByteArrayOutputStream; import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.StandardSSLContextService; @@ -299,4 +300,74 @@ public class TestPostHTTP { Map lastPostHeaders = servlet.getLastPostHeaders(); Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); } + + @Test + public void testSendWithCompressionServerAcceptGzip() throws Exception { + setup(null); + + final String suppliedMimeType = "text/plain"; + runner.setProperty(PostHTTP.URL, server.getUrl()); + runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType); + runner.setProperty(PostHTTP.COMPRESSION_LEVEL, "9"); + + final Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); + + runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs); + + runner.run(1); + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); + + Map lastPostHeaders = servlet.getLastPostHeaders(); + Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); + // Ensure that a 'Content-Encoding' header was set with a 'gzip' value + Assert.assertEquals(PostHTTP.CONTENT_ENCODING_GZIP_VALUE, lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER)); + } + + @Test + public void testSendWithoutCompressionServerAcceptGzip() throws Exception { + setup(null); + + final String suppliedMimeType = "text/plain"; + runner.setProperty(PostHTTP.URL, server.getUrl()); + runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType); + runner.setProperty(PostHTTP.COMPRESSION_LEVEL, "0"); + + final Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); + + runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs); + + runner.run(1); + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); + + Map lastPostHeaders = servlet.getLastPostHeaders(); + Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); + // Ensure that the request was not sent with a 'Content-Encoding' header + Assert.assertNull(lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER)); + } + + @Test + public void testSendWithCompressionServerNotAcceptGzip() throws Exception { + setup(null); + + final String suppliedMimeType = "text/plain"; + // Specify a property to the URL to have the CaptureServlet specify it doesn't accept gzip + runner.setProperty(PostHTTP.URL, server.getUrl()+"?acceptGzip=false"); + runner.setProperty(PostHTTP.CONTENT_TYPE, suppliedMimeType); + runner.setProperty(PostHTTP.COMPRESSION_LEVEL, "9"); + + final Map attrs = new HashMap<>(); + attrs.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); + + runner.enqueue(StringUtils.repeat("This is the song that never ends. It goes on and on my friend.", 100).getBytes(), attrs); + + runner.run(1); + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); + + Map lastPostHeaders = servlet.getLastPostHeaders(); + Assert.assertEquals(suppliedMimeType, lastPostHeaders.get(PostHTTP.CONTENT_TYPE_HEADER)); + // Ensure that the request was not sent with a 'Content-Encoding' header + Assert.assertNull(lastPostHeaders.get(PostHTTP.CONTENT_ENCODING_HEADER)); + } }