NIFI-10530 MiNiFi: Add option to compress c2 requests

This closes #6439

Signed-off-by: Csaba Bejan <bejan.csaba@gmail.com>
This commit is contained in:
Ferenc Kis 2022-09-21 15:30:09 +02:00 committed by Csaba Bejan
parent cfb1597e10
commit 1b1d388ef7
No known key found for this signature in database
GPG Key ID: C59951609F8BDDEB
8 changed files with 219 additions and 6 deletions

View File

@ -40,7 +40,7 @@ public class C2ClientConfig {
private final long callTimeout;
private final long readTimeout;
private final long connectTimeout;
private final String c2RequestCompression;
private C2ClientConfig(final Builder builder) {
this.c2Url = builder.c2Url;
@ -62,6 +62,7 @@ public class C2ClientConfig {
this.truststoreType = builder.truststoreType;
this.readTimeout = builder.readTimeout;
this.connectTimeout = builder.connectTimeout;
this.c2RequestCompression = builder.c2RequestCompression;
}
public String getC2Url() {
@ -140,6 +141,10 @@ public class C2ClientConfig {
return connectTimeout;
}
public String getC2RequestCompression() {
return c2RequestCompression;
}
/**
* Builder for client configuration.
*/
@ -164,6 +169,7 @@ public class C2ClientConfig {
private String truststoreType;
private long readTimeout;
private long connectTimeout;
private String c2RequestCompression;
public Builder c2Url(final String c2Url) {
this.c2Url = c2Url;
@ -260,6 +266,11 @@ public class C2ClientConfig {
return this;
}
public Builder c2RequestCompression(final String c2RequestCompression) {
this.c2RequestCompression = c2RequestCompression;
return this;
}
public C2ClientConfig build() {
return new C2ClientConfig(this);
}

View File

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.http;
import java.io.FileInputStream;
@ -39,17 +40,17 @@ import okhttp3.logging.HttpLoggingInterceptor;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.c2.client.C2ClientConfig;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.serializer.C2Serializer;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.serializer.C2Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class C2HttpClient implements C2Client {
static final MediaType MEDIA_TYPE_APPLICATION_JSON = MediaType.parse("application/json");
private static final Logger logger = LoggerFactory.getLogger(C2HttpClient.class);
private static final MediaType MEDIA_TYPE_APPLICATION_JSON = MediaType.parse("application/json");
private final AtomicReference<OkHttpClient> httpClientReference = new AtomicReference<>();
private final C2ClientConfig clientConfig;
@ -126,6 +127,7 @@ public class C2HttpClient implements C2Client {
serializer.serialize(operationAck)
.map(operationAckBody -> RequestBody.create(operationAckBody, MEDIA_TYPE_APPLICATION_JSON))
.map(requestBody -> new Request.Builder().post(requestBody).url(clientConfig.getC2AckUrl()).build())
.map(C2RequestCompression.forType(clientConfig.getC2RequestCompression())::compress)
.ifPresent(this::sendAck);
}
@ -136,7 +138,9 @@ public class C2HttpClient implements C2Client {
.url(clientConfig.getC2Url())
.build();
try (Response heartbeatResponse = httpClientReference.get().newCall(request).execute()) {
Request decoratedRequest = C2RequestCompression.forType(clientConfig.getC2RequestCompression()).compress(request);
try (Response heartbeatResponse = httpClientReference.get().newCall(decoratedRequest).execute()) {
c2HeartbeatResponse = getResponseBody(heartbeatResponse).flatMap(response -> serializer.deserialize(response, C2HeartbeatResponse.class));
} catch (IOException ce) {
logger.error("Send Heartbeat failed [{}]", clientConfig.getC2Url(), ce);
@ -237,7 +241,7 @@ public class C2HttpClient implements C2Client {
}
private void sendAck(Request request) {
try(Response heartbeatResponse = httpClientReference.get().newCall(request).execute()) {
try (Response heartbeatResponse = httpClientReference.get().newCall(request).execute()) {
if (!heartbeatResponse.isSuccessful()) {
logger.warn("Acknowledgement was not successful with c2 server [{}] with status code {}", clientConfig.getC2AckUrl(), heartbeatResponse.code());
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.http;
import java.io.IOException;
import java.util.stream.Stream;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.RequestBody;
import okio.BufferedSink;
import okio.GzipSink;
import okio.Okio;
public enum C2RequestCompression {
NONE("none") {
@Override
public Request compress(Request request) {
return request;
}
},
GZIP("gzip") {
@Override
public Request compress(Request request) {
return request.newBuilder()
.header(CONTENT_ENCODING_HEADER, GZIP_ENCODING)
.method(request.method(), toGzipRequestBody(request.body()))
.build();
}
private RequestBody toGzipRequestBody(RequestBody requestBody) {
return new RequestBody() {
@Override
public MediaType contentType() {
return requestBody.contentType();
}
@Override
public long contentLength() {
return -1;
}
@Override
public void writeTo(BufferedSink sink) throws IOException {
try (BufferedSink bufferedGzipSink = Okio.buffer(new GzipSink(sink))) {
requestBody.writeTo(bufferedGzipSink);
}
}
};
}
};
static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
static final String GZIP_ENCODING = "gzip";
private final String compressionType;
C2RequestCompression(String compressionType) {
this.compressionType = compressionType;
}
public static C2RequestCompression forType(String compressionType) {
return Stream.of(values())
.filter(c2RequestCompression -> c2RequestCompression.compressionType.equalsIgnoreCase(compressionType))
.findAny()
.orElse(NONE);
}
public abstract Request compress(Request request);
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.http;
import static org.apache.nifi.c2.client.http.C2HttpClient.MEDIA_TYPE_APPLICATION_JSON;
import static org.apache.nifi.c2.client.http.C2RequestCompression.CONTENT_ENCODING_HEADER;
import static org.apache.nifi.c2.client.http.C2RequestCompression.GZIP;
import static org.apache.nifi.c2.client.http.C2RequestCompression.GZIP_ENCODING;
import static org.apache.nifi.c2.client.http.C2RequestCompression.NONE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.stream.Stream;
import okhttp3.Request;
import okhttp3.RequestBody;
import okio.Buffer;
import okio.GzipSource;
import okio.Okio;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
public class C2RequestCompressionTest {
private static final String DEFAULT_C2_SERVER_URL = "http://localhost/c2";
private static final String DEFAULT_POST_BODY = "{ \"field\": \"value\" }";
private static Stream<Arguments> compressionTypes() {
return Stream.of(
Arguments.of("none", NONE),
Arguments.of("gzip", GZIP),
Arguments.of("unknown_compression_type", NONE)
);
}
@ParameterizedTest
@MethodSource("compressionTypes")
public void testAppropriateCompressionTypeIsGivenBackForType(String compressionType, C2RequestCompression expectedCompression) {
assertEquals(expectedCompression, C2RequestCompression.forType(compressionType));
}
@Test
public void testNoneCompressionShouldLeaveRequestBodyIntact() throws IOException {
// given
Request request = new Request.Builder()
.post(RequestBody.create(DEFAULT_POST_BODY, MEDIA_TYPE_APPLICATION_JSON))
.url(DEFAULT_C2_SERVER_URL)
.build();
// when
Request result = NONE.compress(request);
// then
assertTrue(result.body().contentType().toString().contains(MEDIA_TYPE_APPLICATION_JSON.toString()));
assertEquals(DEFAULT_POST_BODY, uncompressedRequestBodyToString(result));
}
@Test
public void testGzipCompressionShouldCompressRequestBodyAndAdjustRequestHeader() throws IOException {
// given
Request request = new Request.Builder()
.post(RequestBody.create(DEFAULT_POST_BODY, MEDIA_TYPE_APPLICATION_JSON))
.url(DEFAULT_C2_SERVER_URL)
.build();
// when
Request result = GZIP.compress(request);
// then
assertTrue(result.body().contentType().toString().contains(MEDIA_TYPE_APPLICATION_JSON.toString()));
assertEquals(GZIP_ENCODING, result.headers().get(CONTENT_ENCODING_HEADER));
assertEquals(DEFAULT_POST_BODY, gzippedRequestBodyToString(result));
}
private String uncompressedRequestBodyToString(Request request) throws IOException {
Buffer buffer = requestToBuffer(request);
return buffer.readUtf8();
}
private String gzippedRequestBodyToString(Request request) throws IOException {
Buffer buffer = requestToBuffer(request);
return Okio.buffer(new GzipSource(buffer)).readUtf8();
}
private Buffer requestToBuffer(Request request) throws IOException {
Buffer buffer = new Buffer();
request.body().writeTo(buffer);
return buffer;
}
}

View File

@ -136,7 +136,8 @@ nifi.minifi.notifier.ingestors.file.polling.period.seconds=5
4. Start MiNiFi
5. When a new flow is available on the C2 server, MiNiFi will download it via C2 and restart itself to pick up the changes
**Note:** Flow definitions are class based. Each class has one flow defined for it. As a result, all the agents belonging to the same class will get the flow at update.
**Note:** Flow definitions are class based. Each class has one flow defined for it. As a result, all the agents belonging to the same class will get the flow at update.<br>
**Note:** Compression can be turned on for C2 requests by setting `c2.request.compression=gzip`. Compression is turned off by default when the parameter is omitted, or when `c2.request.compression=none` is given. It can be beneficial to turn compression on to prevent network saturation.
## Loading a New Dataflow

View File

@ -158,6 +158,7 @@ java.arg.14=-Djava.awt.headless=true
#c2.security.keystore.location=
#c2.security.keystore.password=
#c2.security.keystore.type=JKS
#c2.request.compression=none
# The following ingestor configuration needs to be enabled in order to apply configuration updates coming from C2 server
#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.FileChangeIngestor
#nifi.minifi.notifier.ingestors.file.config.path=./conf/config-new.yml

View File

@ -39,6 +39,7 @@ public class C2NiFiProperties {
public static final String C2_AGENT_CLASS_KEY = C2_PREFIX + "agent.class";
public static final String C2_AGENT_IDENTIFIER_KEY = C2_PREFIX + "agent.identifier";
public static final String C2_FULL_HEARTBEAT_KEY = C2_PREFIX + "full.heartbeat";
public static final String C2_REQUEST_COMPRESSION_KEY = C2_PREFIX + "request.compression";
public static final String C2_ROOT_CLASS_DEFINITIONS_KEY = C2_PREFIX + "root.class.definitions";
public static final String C2_METRICS_NAME_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.name";
@ -69,4 +70,7 @@ public class C2NiFiProperties {
public static final String C2_DEFAULT_READ_TIMEOUT = "5 sec";
// Call timeout of 10 seconds
public static final String C2_DEFAULT_CALL_TIMEOUT = "10 sec";
// C2 request compression is turned off by default
public static final String C2_REQUEST_COMPRESSION= "none";
}

View File

@ -115,6 +115,7 @@ public class C2NifiClientService {
.callTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_CALL_TIMEOUT,
C2NiFiProperties.C2_DEFAULT_CALL_TIMEOUT), TimeUnit.MILLISECONDS))
.c2Url(properties.getProperty(C2NiFiProperties.C2_REST_URL_KEY, ""))
.c2RequestCompression(properties.getProperty(C2NiFiProperties.C2_REQUEST_COMPRESSION_KEY, C2NiFiProperties.C2_REQUEST_COMPRESSION))
.confDirectory(properties.getProperty(C2NiFiProperties.C2_CONFIG_DIRECTORY_KEY, DEFAULT_CONF_DIR))
.runtimeManifestIdentifier(properties.getProperty(C2NiFiProperties.C2_RUNTIME_MANIFEST_IDENTIFIER_KEY, ""))
.runtimeType(properties.getProperty(C2NiFiProperties.C2_RUNTIME_TYPE_KEY, ""))