NIFI-10161 Added Gzip Content-Encoding to InvokeHTTP and ListenHTTP

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #6150.
This commit is contained in:
exceptionfactory 2022-06-23 11:51:17 -05:00 committed by Pierre Villard
parent b6a32a9c5d
commit d8ebfb25d0
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
5 changed files with 199 additions and 47 deletions

View File

@ -73,6 +73,9 @@ import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okio.BufferedSink;
import okio.GzipSink;
import okio.Okio;
import okio.Source;
import org.apache.commons.io.input.TeeInputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperties;
@ -103,6 +106,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.http.ContentEncodingStrategy;
import org.apache.nifi.processors.standard.http.FlowFileNamingStrategy;
import org.apache.nifi.processors.standard.http.CookieStrategy;
import org.apache.nifi.processors.standard.util.ProxyAuthenticator;
@ -186,6 +190,7 @@ public class InvokeHTTP extends AbstractProcessor {
private static final Pattern DYNAMIC_FORM_PARAMETER_NAME = Pattern.compile("post:form:(?<formDataName>.*)$");
private static final String FORM_DATA_NAME_GROUP = "formDataName";
private static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
// properties
public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
@ -329,6 +334,15 @@ public class InvokeHTTP extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor PROP_CONTENT_ENCODING = new PropertyDescriptor.Builder()
.name("Content-Encoding")
.displayName("Content-Encoding")
.description("HTTP Content-Encoding applied to request body during transmission. The receiving server must support the selected encoding to avoid request failures.")
.required(true)
.defaultValue(ContentEncodingStrategy.DISABLED.getValue())
.allowableValues(ContentEncodingStrategy.class)
.build();
public static final PropertyDescriptor PROP_CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("Content-Type")
.description("The Content-Type to specify for when content is being transmitted through a PUT, POST or PATCH. "
@ -564,6 +578,7 @@ public class InvokeHTTP extends AbstractProcessor {
PROP_DIGEST_AUTH,
PROP_OUTPUT_RESPONSE_REGARDLESS,
PROP_ADD_HEADERS_TO_REQUEST,
PROP_CONTENT_ENCODING,
PROP_CONTENT_TYPE,
PROP_SEND_BODY,
PROP_USE_CHUNKED_ENCODING,
@ -1112,6 +1127,12 @@ public class InvokeHTTP extends AbstractProcessor {
}
}
final String contentEncoding = context.getProperty(PROP_CONTENT_ENCODING).getValue();
final ContentEncodingStrategy contentEncodingStrategy = ContentEncodingStrategy.valueOf(contentEncoding);
if (ContentEncodingStrategy.GZIP == contentEncodingStrategy) {
requestBuilder.addHeader(CONTENT_ENCODING_HEADER, ContentEncodingStrategy.GZIP.getValue().toLowerCase());
}
// set the request method
String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile).getValue()).toUpperCase();
switch (method) {
@ -1119,15 +1140,15 @@ public class InvokeHTTP extends AbstractProcessor {
requestBuilder.get();
break;
case POST_METHOD:
RequestBody requestBody = getRequestBodyToSend(session, context, requestFlowFile);
RequestBody requestBody = getRequestBodyToSend(session, context, requestFlowFile, contentEncodingStrategy);
requestBuilder.post(requestBody);
break;
case PUT_METHOD:
requestBody = getRequestBodyToSend(session, context, requestFlowFile);
requestBody = getRequestBodyToSend(session, context, requestFlowFile, contentEncodingStrategy);
requestBuilder.put(requestBody);
break;
case PATCH_METHOD:
requestBody = getRequestBodyToSend(session, context, requestFlowFile);
requestBody = getRequestBodyToSend(session, context, requestFlowFile, contentEncodingStrategy);
requestBuilder.patch(requestBody);
break;
case HEAD_METHOD:
@ -1149,8 +1170,9 @@ public class InvokeHTTP extends AbstractProcessor {
}
private RequestBody getRequestBodyToSend(final ProcessSession session, final ProcessContext context,
final FlowFile requestFlowFile) {
final FlowFile requestFlowFile,
final ContentEncodingStrategy contentEncodingStrategy
) {
boolean sendBody = context.getProperty(PROP_SEND_BODY).asBoolean();
String evalContentType = context.getProperty(PROP_CONTENT_TYPE)
@ -1168,6 +1190,7 @@ public class InvokeHTTP extends AbstractProcessor {
}
}
final boolean contentLengthUnknown = useChunked || ContentEncodingStrategy.GZIP == contentEncodingStrategy;
RequestBody requestBody = new RequestBody() {
@Nullable
@Override
@ -1176,13 +1199,25 @@ public class InvokeHTTP extends AbstractProcessor {
}
@Override
public void writeTo(BufferedSink sink) {
session.exportTo(requestFlowFile, sink.outputStream());
public void writeTo(final BufferedSink sink) throws IOException {
final BufferedSink outputSink = (ContentEncodingStrategy.GZIP == contentEncodingStrategy)
? Okio.buffer(new GzipSink(sink))
: sink;
session.read(requestFlowFile, inputStream -> {
final Source source = Okio.source(inputStream);
outputSink.writeAll(source);
});
// Close Output Sink for gzip to write trailing bytes
if (ContentEncodingStrategy.GZIP == contentEncodingStrategy) {
outputSink.close();
}
}
@Override
public long contentLength() {
return useChunked ? -1 : requestFlowFile.getSize();
return contentLengthUnknown ? -1 : requestFlowFile.getSize();
}
};

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.http;
import org.apache.nifi.components.DescribedValue;
/**
* HTTP Content-Encoding configuration strategy
*/
public enum ContentEncodingStrategy implements DescribedValue {
DISABLED("Content encoding not applied during transmission"),
GZIP( "Gzip content encoding and HTTP Content-Encoding header applied during transmission");
private final String description;
ContentEncodingStrategy(final String description) {
this.description = description;
}
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return name();
}
@Override
public String getDescription() {
return description;
}
}

View File

@ -101,6 +101,7 @@ public class ListenHTTPServlet extends HttpServlet {
public static final String GZIPPED_HEADER = "flowfile-gzipped";
public static final String PROTOCOL_VERSION_HEADER = "x-nifi-transfer-protocol-version";
public static final String PROTOCOL_VERSION = "3";
protected static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
private final AtomicLong filesReceived = new AtomicLong(0L);
private final AtomicBoolean spaceAvailable = new AtomicBoolean(true);
@ -193,7 +194,10 @@ public class ListenHTTPServlet extends HttpServlet {
}
response.setHeader("Content-Type", MediaType.TEXT_PLAIN);
final boolean contentGzipped = Boolean.parseBoolean(request.getHeader(GZIPPED_HEADER));
final boolean flowFileGzipped = Boolean.parseBoolean(request.getHeader(GZIPPED_HEADER));
final String contentEncoding = request.getHeader(CONTENT_ENCODING_HEADER);
final boolean contentEncodingGzip = ACCEPT_ENCODING_VALUE.equals(contentEncoding);
final boolean contentGzipped = flowFileGzipped || contentEncodingGzip;
final X509Certificate[] certs = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
foundSubject = DEFAULT_FOUND_SUBJECT;

View File

@ -19,10 +19,13 @@ package org.apache.nifi.processors.standard;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import okio.Buffer;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.standard.http.ContentEncodingStrategy;
import org.apache.nifi.processors.standard.http.FlowFileNamingStrategy;
import org.apache.nifi.processors.standard.http.CookieStrategy;
import org.apache.nifi.reporting.InitializationException;
@ -44,6 +47,7 @@ import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
@ -55,6 +59,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@ -101,6 +106,8 @@ public class InvokeHTTPTest {
private static final String CONTENT_LENGTH_HEADER = "Content-Length";
private static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
private static final String CONTENT_TYPE_HEADER = "Content-Type";
private static final String LOCATION_HEADER = "Location";
@ -521,7 +528,7 @@ public class InvokeHTTPTest {
final String authorization = request.getHeader(AUTHORIZATION_HEADER);
assertNotNull(authorization, "Authorization Header not found");
final Pattern basicAuthPattern = Pattern.compile("^Basic [^\\s]+$");
final Pattern basicAuthPattern = Pattern.compile("^Basic \\S+$");
assertTrue(basicAuthPattern.matcher(authorization).matches(), "Basic Authentication not matched");
}
@ -723,6 +730,31 @@ public class InvokeHTTPTest {
assertRequestMethodSuccess(POST_METHOD);
}
@Test
public void testRunPostHttp200SuccessContentEncodingGzip() throws InterruptedException, IOException {
runner.setProperty(InvokeHTTP.PROP_METHOD, POST_METHOD);
runner.setProperty(InvokeHTTP.PROP_CONTENT_ENCODING, ContentEncodingStrategy.GZIP.getValue());
runner.setProperty(InvokeHTTP.PROP_SEND_BODY, Boolean.TRUE.toString());
enqueueResponseCodeAndRun(HTTP_OK);
assertResponseSuccessRelationships();
assertRelationshipStatusCodeEquals(InvokeHTTP.REL_RESPONSE, HTTP_OK);
final RecordedRequest request = takeRequestCompleted();
final String contentLength = request.getHeader(CONTENT_LENGTH_HEADER);
assertNull(contentLength, "Content-Length Request Header found");
final String contentEncoding = request.getHeader(CONTENT_ENCODING_HEADER);
assertEquals(ContentEncodingStrategy.GZIP.getValue().toLowerCase(), contentEncoding);
final Buffer body = request.getBody();
try (final GZIPInputStream gzipInputStream = new GZIPInputStream(body.inputStream())) {
final String decompressed = IOUtils.toString(gzipInputStream, StandardCharsets.UTF_8);
assertEquals(FLOW_FILE_CONTENT, decompressed);
}
}
@Test
public void testRunPostHttp200SuccessChunkedEncoding() throws InterruptedException {
runner.setProperty(InvokeHTTP.PROP_METHOD, POST_METHOD);

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.standard;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -45,8 +46,12 @@ import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okio.BufferedSink;
import okio.GzipSink;
import okio.Okio;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processors.standard.http.ContentEncodingStrategy;
import org.apache.nifi.processors.standard.http.HttpProtocolStrategy;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
@ -54,6 +59,7 @@ import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.StandardTlsConfiguration;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.security.util.TlsPlatform;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType;
@ -65,17 +71,17 @@ import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIf;
import org.mockito.Mockito;
import static org.apache.nifi.processors.standard.ListenHTTP.RELATIONSHIP_SUCCESS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestListenHTTP {
@ -114,7 +120,11 @@ public class TestListenHTTP {
private int availablePort;
@BeforeClass
static boolean isTls13Supported() {
return TLS_1_3.equals(TlsPlatform.getLatestProtocol());
}
@BeforeAll
public static void setUpSuite() throws GeneralSecurityException {
// generate new keystore and truststore
final TlsConfiguration tlsConfiguration = new TemporaryKeyStoreBuilder().build();
@ -172,7 +182,7 @@ public class TestListenHTTP {
);
}
@Before
@BeforeEach
public void setup() throws IOException {
proc = new ListenHTTP();
runner = TestRunners.newTestRunner(proc);
@ -181,7 +191,7 @@ public class TestListenHTTP {
runner.setVariable(BASEPATH_VARIABLE, HTTP_BASE_PATH);
}
@After
@AfterEach
public void shutdownServer() {
proc.shutdownHttpServer();
}
@ -360,13 +370,14 @@ public class TestListenHTTP {
startSecureServer();
final SSLSocketFactory sslSocketFactory = trustStoreSslContext.getSocketFactory();
final SSLSocket sslSocket = (SSLSocket) sslSocketFactory.createSocket(LOCALHOST, availablePort);
try (final SSLSocket sslSocket = (SSLSocket) sslSocketFactory.createSocket(LOCALHOST, availablePort)) {
final String currentProtocol = serverNoTruststoreConfiguration.getProtocol();
sslSocket.setEnabledProtocols(new String[]{currentProtocol});
sslSocket.startHandshake();
final SSLSession sslSession = sslSocket.getSession();
assertEquals("SSL Session Protocol not matched", currentProtocol, sslSession.getProtocol());
assertEquals(currentProtocol, sslSession.getProtocol());
}
}
@Test
@ -384,12 +395,9 @@ public class TestListenHTTP {
assertEquals(HttpServletResponse.SC_NO_CONTENT, responseCode);
}
@EnabledIf(value = "isTls13Supported", disabledReason = "TLSv1.3 is not supported")
@Test
public void testSecureServerRejectsUnsupportedTlsProtocolVersion() throws Exception {
final String currentProtocol = TlsConfiguration.getHighestCurrentSupportedTlsProtocolVersion();
final String protocolMessage = String.format("TLS Protocol required [%s] found [%s]", TLS_1_3, currentProtocol);
Assume.assumeTrue(protocolMessage, TLS_1_3.equals(currentProtocol));
configureProcessorSslContextService(ListenHTTP.ClientAuthentication.AUTO, serverTls_1_3_Configuration);
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
@ -399,68 +407,58 @@ public class TestListenHTTP {
startWebServer();
final SSLSocketFactory sslSocketFactory = trustStoreSslContext.getSocketFactory();
final SSLSocket sslSocket = (SSLSocket) sslSocketFactory.createSocket(LOCALHOST, availablePort);
try (final SSLSocket sslSocket = (SSLSocket) sslSocketFactory.createSocket(LOCALHOST, availablePort)) {
sslSocket.setEnabledProtocols(new String[]{TLS_1_2});
assertThrows(SSLHandshakeException.class, sslSocket::startHandshake);
}
}
@Test
public void testMaxThreadPoolSizeTooLow() {
// GIVEN, WHEN
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "7");
// THEN
runner.assertNotValid();
}
@Test
public void testMaxThreadPoolSizeTooHigh() {
// GIVEN, WHEN
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "1001");
// THEN
runner.assertNotValid();
}
@Test
public void testMaxThreadPoolSizeOkLowerBound() {
// GIVEN, WHEN
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "8");
// THEN
runner.assertValid();
}
@Test
public void testMaxThreadPoolSizeOkUpperBound() {
// GIVEN, WHEN
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "1000");
// THEN
runner.assertValid();
}
@Test
public void testMaxThreadPoolSizeSpecifiedInThePropertyIsSetInTheServerInstance() {
// GIVEN
int maxThreadPoolSize = 201;
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, Integer.toString(maxThreadPoolSize));
// WHEN
startWebServer();
// THEN
Server server = proc.getServer();
ThreadPool threadPool = server.getThreadPool();
ThreadPool.SizedThreadPool sizedThreadPool = (ThreadPool.SizedThreadPool) threadPool;
@ -518,6 +516,40 @@ public class TestListenHTTP {
runner.assertTransferCount(RELATIONSHIP_SUCCESS, 0);
}
@Test
public void testPostContentEncodingGzipAccepted() throws IOException {
runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort));
runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);
runner.setProperty(ListenHTTP.RETURN_CODE, Integer.toString(HttpServletResponse.SC_NO_CONTENT));
startWebServer();
final OkHttpClient okHttpClient = getOkHttpClient(false, false);
final Request.Builder requestBuilder = new Request.Builder();
final String url = buildUrl(false);
requestBuilder.url(url);
final String message = String.class.getSimpleName();
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final BufferedSink gzipSink = Okio.buffer(new GzipSink(Okio.sink(outputStream)));
gzipSink.write(message.getBytes(StandardCharsets.UTF_8));
gzipSink.close();
final byte[] compressed = outputStream.toByteArray();
final RequestBody requestBody = RequestBody.create(compressed, APPLICATION_OCTET_STREAM);
final Request request = requestBuilder.post(requestBody)
.addHeader("Content-Encoding", ContentEncodingStrategy.GZIP.getValue().toLowerCase())
.build();
try (final Response response = okHttpClient.newCall(request).execute()) {
assertTrue(response.isSuccessful());
runner.assertTransferCount(RELATIONSHIP_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(RELATIONSHIP_SUCCESS).iterator().next();
flowFile.assertContentEquals(message);
}
}
private MockRecordParser setupRecordReaderTest() throws InitializationException {
final MockRecordParser parser = new MockRecordParser();
final MockRecordWriter writer = new MockRecordWriter();
@ -636,7 +668,7 @@ public class TestListenHTTP {
for (final String message : messages) {
final int statusCode = postMessage(message, secure, clientAuthRequired);
assertEquals("HTTP Status Code not matched", expectedStatusCode, statusCode);
assertEquals(expectedStatusCode, statusCode, "HTTP Status Code not matched");
}
}
@ -690,7 +722,7 @@ public class TestListenHTTP {
try (Response response = client.newCall(request).execute()) {
Files.deleteIfExists(Paths.get(String.valueOf(file1)));
Files.deleteIfExists(Paths.get(String.valueOf(file2)));
Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body()), response.isSuccessful());
assertTrue(response.isSuccessful(), String.format("Unexpected code: %s, body: %s", response.code(), response.body()));
}
runner.assertAllFlowFilesTransferred(ListenHTTP.RELATIONSHIP_SUCCESS, 5);