NIFI-11366 Proxy aware C2 communication

Signed-off-by: Ferenc Erdei <ferdei@cloudera.com>
This closes #7125
This commit is contained in:
Ferenc Kis 2023-04-03 13:13:20 +02:00 committed by Ferenc Erdei
parent a148b6f348
commit d84ce83654
No known key found for this signature in database
GPG Key ID: F8FAAD09EA246008
21 changed files with 936 additions and 268 deletions

View File

@ -35,20 +35,20 @@ public interface C2Client {
Optional<C2HeartbeatResponse> publishHeartbeat(C2Heartbeat heartbeat);
/**
* Retrieve the content of the new flow from the C2 Server
* After operation completed the acknowledgment to be sent to the C2 Server
*
* @param flowUpdateUrl url where the content should be downloaded from
* @return the actual downloaded content. Will be empty if no content can be downloaded
*/
Optional<byte[]> retrieveUpdateContent(String flowUpdateUrl);
/**
* After operation completed the acknowledge to be sent to the C2 Server
*
* @param operationAck the acknowledge details to be sent
* @param operationAck the acknowledgment details to be sent
*/
void acknowledgeOperation(C2OperationAck operationAck);
/**
* Retrieve the content of the new flow from the C2 Server
*
* @param callbackUrl url where the content should be downloaded from
* @return the actual downloaded content. Will be empty if no content can be downloaded
*/
Optional<byte[]> retrieveUpdateContent(String callbackUrl);
/**
* Uploads a binary bundle to C2 server
*
@ -57,4 +57,13 @@ public interface C2Client {
* @return optional error message if any issues occurred
*/
Optional<String> uploadBundle(String callbackUrl, byte[] bundle);
/**
* Creates a callback URL according to proxy aware C2 settings
*
* @param absoluteUrl absolute url sent by C2 server
* @param relativeUrl relative url sent by C2 server
* @return an optional with content of finalised callback url
*/
Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl);
}

View File

@ -23,6 +23,9 @@ public class C2ClientConfig {
private final String c2Url;
private final String c2AckUrl;
private final String c2RestPathBase;
private final String c2RestPathHeartbeat;
private final String c2RestPathAcknowledge;
private final String agentClass;
private final String agentIdentifier;
private final boolean fullHeartbeat;
@ -48,6 +51,9 @@ public class C2ClientConfig {
private C2ClientConfig(final Builder builder) {
this.c2Url = builder.c2Url;
this.c2AckUrl = builder.c2AckUrl;
this.c2RestPathBase = builder.c2RestPathBase;
this.c2RestPathHeartbeat = builder.c2RestPathHeartbeat;
this.c2RestPathAcknowledge = builder.c2RestPathAcknowledge;
this.agentClass = builder.agentClass;
this.agentIdentifier = builder.agentIdentifier;
this.fullHeartbeat = builder.fullHeartbeat;
@ -87,6 +93,18 @@ public class C2ClientConfig {
return agentIdentifier;
}
public String getC2RestPathBase() {
return c2RestPathBase;
}
public String getC2RestPathHeartbeat() {
return c2RestPathHeartbeat;
}
public String getC2RestPathAcknowledge() {
return c2RestPathAcknowledge;
}
public boolean isFullHeartbeat() {
return fullHeartbeat;
}
@ -170,6 +188,9 @@ public class C2ClientConfig {
private String c2Url;
private String c2AckUrl;
private String c2RestPathBase;
private String c2RestPathHeartbeat;
private String c2RestPathAcknowledge;
private String agentClass;
private String agentIdentifier;
private boolean fullHeartbeat;
@ -192,117 +213,130 @@ public class C2ClientConfig {
private String c2RequestCompression;
private String c2AssetDirectory;
public Builder c2Url(final String c2Url) {
public Builder c2Url(String c2Url) {
this.c2Url = c2Url;
return this;
}
public Builder c2AckUrl(final String c2AckUrl) {
public Builder c2AckUrl(String c2AckUrl) {
this.c2AckUrl = c2AckUrl;
return this;
}
public Builder agentClass(final String agentClass) {
public Builder c2RestPathBase(String c2RestPathBase) {
this.c2RestPathBase = c2RestPathBase;
return this;
}
public Builder c2RestPathHeartbeat(String c2RestPathHeartbeat) {
this.c2RestPathHeartbeat = c2RestPathHeartbeat;
return this;
}
public Builder c2RestPathAcknowledge(String c2RestPathAcknowledge) {
this.c2RestPathAcknowledge = c2RestPathAcknowledge;
return this;
}
public Builder agentClass(String agentClass) {
this.agentClass = agentClass;
return this;
}
public Builder agentIdentifier(final String agentIdentifier) {
public Builder agentIdentifier(String agentIdentifier) {
this.agentIdentifier = agentIdentifier;
return this;
}
public Builder fullHeartbeat(final boolean fullHeartbeat) {
public Builder fullHeartbeat(boolean fullHeartbeat) {
this.fullHeartbeat = fullHeartbeat;
return this;
}
public Builder confDirectory(final String confDirectory) {
public Builder confDirectory(String confDirectory) {
this.confDirectory = confDirectory;
return this;
}
public Builder runtimeManifestIdentifier(final String runtimeManifestIdentifier) {
public Builder runtimeManifestIdentifier(String runtimeManifestIdentifier) {
this.runtimeManifestIdentifier = runtimeManifestIdentifier;
return this;
}
public Builder runtimeType(final String runtimeType) {
public Builder runtimeType(String runtimeType) {
this.runtimeType = runtimeType;
return this;
}
public Builder heartbeatPeriod(final long heartbeatPeriod) {
public Builder heartbeatPeriod(long heartbeatPeriod) {
this.heartbeatPeriod = heartbeatPeriod;
return this;
}
public Builder callTimeout(final long callTimeout) {
public Builder callTimeout(long callTimeout) {
this.callTimeout = callTimeout;
return this;
}
public Builder keystoreFilename(final String keystoreFilename) {
public Builder keystoreFilename(String keystoreFilename) {
this.keystoreFilename = keystoreFilename;
return this;
}
public Builder keystorePassword(final String keystorePass) {
public Builder keystorePassword(String keystorePass) {
this.keystorePass = keystorePass;
return this;
}
public Builder keyPassword(final String keyPass) {
public Builder keyPassword(String keyPass) {
this.keyPass = keyPass;
return this;
}
public Builder keystoreType(final String keystoreType) {
public Builder keystoreType(String keystoreType) {
this.keystoreType = keystoreType;
return this;
}
public Builder truststoreFilename(final String truststoreFilename) {
public Builder truststoreFilename(String truststoreFilename) {
this.truststoreFilename = truststoreFilename;
return this;
}
public Builder truststorePassword(final String truststorePass) {
public Builder truststorePassword(String truststorePass) {
this.truststorePass = truststorePass;
return this;
}
public Builder truststoreType(final String truststoreType) {
public Builder truststoreType(String truststoreType) {
this.truststoreType = truststoreType;
return this;
}
public Builder readTimeout(final long readTimeout) {
public Builder readTimeout(long readTimeout) {
this.readTimeout = readTimeout;
return this;
}
public Builder connectTimeout(final long connectTimeout) {
public Builder connectTimeout(long connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
}
public Builder maxIdleConnections(final int maxIdleConnections) {
public Builder maxIdleConnections(int maxIdleConnections) {
this.maxIdleConnections = maxIdleConnections;
return this;
}
public Builder keepAliveDuration(final long keepAliveDuration) {
public Builder keepAliveDuration(long keepAliveDuration) {
this.keepAliveDuration = keepAliveDuration;
return this;
}
public Builder c2RequestCompression(final String c2RequestCompression) {
public Builder c2RequestCompression(String c2RequestCompression) {
this.c2RequestCompression = c2RequestCompression;
return this;
}
public Builder c2AssetDirectory(final String c2AssetDirectory) {
public Builder c2AssetDirectory(String c2AssetDirectory) {
this.c2AssetDirectory = c2AssetDirectory;
return this;
}

View File

@ -18,35 +18,25 @@
package org.apache.nifi.c2.client.http;
import static okhttp3.MultipartBody.FORM;
import static okhttp3.RequestBody.create;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.X509TrustManager;
import okhttp3.ConnectionPool;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.logging.HttpLoggingInterceptor;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.c2.client.C2ClientConfig;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.client.http.url.C2UrlProvider;
import org.apache.nifi.c2.client.http.url.C2UrlProviderFactory;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.serializer.C2Serializer;
import org.apache.nifi.security.ssl.StandardKeyStoreBuilder;
import org.apache.nifi.security.ssl.StandardSslContextBuilder;
import org.apache.nifi.security.ssl.StandardTrustManagerBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -58,40 +48,23 @@ public class C2HttpClient implements C2Client {
private static final String BUNDLE_FILE_NAME = "debug.tar.gz";
private static final MediaType BUNDLE_MIME_TYPE = MediaType.parse("application/gzip");
private final AtomicReference<OkHttpClient> httpClientReference = new AtomicReference<>();
private final C2ClientConfig clientConfig;
private final C2Serializer serializer;
private final C2UrlProvider c2UrlProvider;
private final AtomicReference<OkHttpClient> httpClientReference = new AtomicReference<>();
public C2HttpClient(C2ClientConfig clientConfig, C2Serializer serializer) {
super();
public static C2HttpClient create(C2ClientConfig clientConfig, C2Serializer serializer) {
OkHttpClient okHttpClient = new OkHttpClientProvider(clientConfig).okHttpClient();
C2UrlProvider c2UrlProvider = new C2UrlProviderFactory(clientConfig).create();
return new C2HttpClient(clientConfig, serializer, c2UrlProvider, okHttpClient);
}
C2HttpClient(C2ClientConfig clientConfig, C2Serializer serializer,
C2UrlProvider c2UrlProvider, OkHttpClient okHttpClient) {
this.clientConfig = clientConfig;
this.serializer = serializer;
final OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder();
// Configure request and response logging
HttpLoggingInterceptor logging = new HttpLoggingInterceptor(logger::debug);
logging.setLevel(HttpLoggingInterceptor.Level.BASIC);
okHttpClientBuilder.addInterceptor(logging);
// Set whether to follow redirects
okHttpClientBuilder.followRedirects(true);
okHttpClientBuilder.connectionPool(new ConnectionPool(clientConfig.getMaxIdleConnections(), clientConfig.getKeepAliveDuration(), TimeUnit.MILLISECONDS));
// Timeouts
okHttpClientBuilder.connectTimeout(clientConfig.getConnectTimeout(), TimeUnit.MILLISECONDS);
okHttpClientBuilder.readTimeout(clientConfig.getReadTimeout(), TimeUnit.MILLISECONDS);
okHttpClientBuilder.callTimeout(clientConfig.getCallTimeout(), TimeUnit.MILLISECONDS);
// check if the ssl path is set and add the factory if so
if (StringUtils.isNotBlank(clientConfig.getKeystoreFilename())) {
try {
setSslSocketFactory(okHttpClientBuilder);
} catch (Exception e) {
throw new IllegalStateException("OkHttp TLS configuration failed", e);
}
}
httpClientReference.set(okHttpClientBuilder.build());
this.c2UrlProvider = c2UrlProvider;
this.httpClientReference.set(okHttpClient);
}
@Override
@ -100,12 +73,24 @@ public class C2HttpClient implements C2Client {
}
@Override
public Optional<byte[]> retrieveUpdateContent(String flowUpdateUrl) {
public void acknowledgeOperation(C2OperationAck operationAck) {
String c2AcknowledgeUrl = c2UrlProvider.getAcknowledgeUrl();
logger.info("Acknowledging Operation {} to C2 server {}", operationAck.getOperationId(), c2AcknowledgeUrl);
serializer.serialize(operationAck)
.map(operationAckBody -> RequestBody.create(operationAckBody, MEDIA_TYPE_APPLICATION_JSON))
.map(requestBody -> new Request.Builder().post(requestBody).url(c2AcknowledgeUrl).build())
.map(C2RequestCompression.forType(clientConfig.getC2RequestCompression())::compress)
.ifPresent(this::sendAck);
}
@Override
public Optional<byte[]> retrieveUpdateContent(String callbackUrl) {
Optional<byte[]> updateContent = Optional.empty();
final Request.Builder requestBuilder = new Request.Builder()
Request.Builder requestBuilder = new Request.Builder()
.get()
.url(flowUpdateUrl);
final Request request = requestBuilder.build();
.url(callbackUrl);
Request request = requestBuilder.build();
try (Response response = httpClientReference.get().newCall(request).execute()) {
Optional<ResponseBody> body = Optional.ofNullable(response.body());
@ -128,23 +113,13 @@ public class C2HttpClient implements C2Client {
return updateContent;
}
@Override
public void acknowledgeOperation(C2OperationAck operationAck) {
logger.info("Acknowledging Operation {} to C2 server {}", operationAck.getOperationId(), clientConfig.getC2AckUrl());
serializer.serialize(operationAck)
.map(operationAckBody -> create(operationAckBody, MEDIA_TYPE_APPLICATION_JSON))
.map(requestBody -> new Request.Builder().post(requestBody).url(clientConfig.getC2AckUrl()).build())
.map(C2RequestCompression.forType(clientConfig.getC2RequestCompression())::compress)
.ifPresent(this::sendAck);
}
@Override
public Optional<String> uploadBundle(String callbackUrl, byte[] bundle) {
Request request = new Request.Builder()
.url(callbackUrl)
.post(new MultipartBody.Builder()
.setType(FORM)
.addFormDataPart(MULTIPART_FORM_FILE_FIELD_NAME, BUNDLE_FILE_NAME, create(bundle, BUNDLE_MIME_TYPE))
.addFormDataPart(MULTIPART_FORM_FILE_FIELD_NAME, BUNDLE_FILE_NAME, RequestBody.create(bundle, BUNDLE_MIME_TYPE))
.build())
.build();
@ -161,11 +136,16 @@ public class C2HttpClient implements C2Client {
return Optional.empty();
}
@Override
public Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl) {
return c2UrlProvider.getCallbackUrl(absoluteUrl, relativeUrl);
}
private Optional<C2HeartbeatResponse> sendHeartbeat(String heartbeat) {
Optional<C2HeartbeatResponse> c2HeartbeatResponse = Optional.empty();
Request request = new Request.Builder()
.post(create(heartbeat, MEDIA_TYPE_APPLICATION_JSON))
.url(clientConfig.getC2Url())
.post(RequestBody.create(heartbeat, MEDIA_TYPE_APPLICATION_JSON))
.url(c2UrlProvider.getHeartbeatUrl())
.build();
Request decoratedRequest = C2RequestCompression.forType(clientConfig.getC2RequestCompression()).compress(request);
@ -173,7 +153,7 @@ public class C2HttpClient implements C2Client {
try (Response heartbeatResponse = httpClientReference.get().newCall(decoratedRequest).execute()) {
c2HeartbeatResponse = getResponseBody(heartbeatResponse).flatMap(response -> serializer.deserialize(response, C2HeartbeatResponse.class));
} catch (IOException ce) {
logger.error("Send Heartbeat failed to C2 server {}", clientConfig.getC2Url(), ce);
logger.error("Send Heartbeat failed to C2 server {}", c2UrlProvider.getHeartbeatUrl(), ce);
}
return c2HeartbeatResponse;
@ -192,81 +172,13 @@ public class C2HttpClient implements C2Client {
return Optional.ofNullable(responseBody);
}
private void setSslSocketFactory(OkHttpClient.Builder okHttpClientBuilder) throws Exception {
final String keystoreLocation = clientConfig.getKeystoreFilename();
final String keystoreType = clientConfig.getKeystoreType();
final String keystorePass = clientConfig.getKeystorePass();
assertKeystorePropertiesSet(keystoreLocation, keystorePass, keystoreType);
final KeyStore keyStore;
try (final FileInputStream keyStoreStream = new FileInputStream(keystoreLocation)) {
keyStore = new StandardKeyStoreBuilder()
.type(keystoreType)
.inputStream(keyStoreStream)
.password(keystorePass.toCharArray())
.build();
}
final String truststoreLocation = clientConfig.getTruststoreFilename();
final String truststorePass = clientConfig.getTruststorePass();
final String truststoreType = clientConfig.getTruststoreType();
assertTruststorePropertiesSet(truststoreLocation, truststorePass, truststoreType);
final KeyStore truststore;
try (final FileInputStream trustStoreStream = new FileInputStream(truststoreLocation)) {
truststore = new StandardKeyStoreBuilder()
.type(truststoreType)
.inputStream(trustStoreStream)
.password(truststorePass.toCharArray())
.build();
}
final X509TrustManager trustManager = new StandardTrustManagerBuilder().trustStore(truststore).build();
final SSLContext sslContext = new StandardSslContextBuilder()
.keyStore(keyStore)
.keyPassword(keystorePass.toCharArray())
.trustStore(truststore)
.build();
final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
okHttpClientBuilder.sslSocketFactory(sslSocketFactory, trustManager);
}
private void assertKeystorePropertiesSet(String location, String password, String type) {
if (location == null || location.isEmpty()) {
throw new IllegalArgumentException(clientConfig.getKeystoreFilename() + " is null or is empty");
}
if (password == null || password.isEmpty()) {
throw new IllegalArgumentException("The client's keystore filename is set but its password is not (or is empty). If the location is set, the password must also be.");
}
if (type == null || type.isEmpty()) {
throw new IllegalArgumentException("The client's keystore filename is set but its type is not (or is empty). If the location is set, the type must also be.");
}
}
private void assertTruststorePropertiesSet(String location, String password, String type) {
if (location == null || location.isEmpty()) {
throw new IllegalArgumentException("The client's truststore filename is not set or is empty");
}
if (password == null || password.isEmpty()) {
throw new IllegalArgumentException("The client's truststore filename is set but its password is not (or is empty). If the location is set, the password must also be.");
}
if (type == null || type.isEmpty()) {
throw new IllegalArgumentException("The client's truststore filename is set but its type is not (or is empty). If the location is set, the type must also be.");
}
}
private void sendAck(Request request) {
try (Response heartbeatResponse = httpClientReference.get().newCall(request).execute()) {
if (!heartbeatResponse.isSuccessful()) {
logger.warn("Acknowledgement was not successful with C2 server {} with status code {}", clientConfig.getC2AckUrl(), heartbeatResponse.code());
logger.warn("Acknowledgement was not successful with C2 server {} with status code {}", c2UrlProvider.getAcknowledgeUrl(), heartbeatResponse.code());
}
} catch (IOException e) {
logger.error("Could not transmit ack to C2 server {}", clientConfig.getC2AckUrl(), e);
logger.error("Could not transmit ack to C2 server {}", c2UrlProvider.getAcknowledgeUrl(), e);
}
}
}

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.http;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.X509TrustManager;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.c2.client.C2ClientConfig;
import org.apache.nifi.security.ssl.StandardKeyStoreBuilder;
import org.apache.nifi.security.ssl.StandardSslContextBuilder;
import org.apache.nifi.security.ssl.StandardTrustManagerBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OkHttpClientProvider {
private static final Logger logger = LoggerFactory.getLogger(OkHttpClientProvider.class);
private final C2ClientConfig clientConfig;
public OkHttpClientProvider(C2ClientConfig clientConfig) {
this.clientConfig = clientConfig;
}
public OkHttpClient okHttpClient() {
OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder();
// Configure request and response logging
HttpLoggingInterceptor logging = new HttpLoggingInterceptor(logger::debug);
logging.setLevel(HttpLoggingInterceptor.Level.BASIC);
okHttpClientBuilder.addInterceptor(logging);
// Set whether to follow redirects
okHttpClientBuilder.followRedirects(true);
okHttpClientBuilder.connectionPool(new ConnectionPool(clientConfig.getMaxIdleConnections(), clientConfig.getKeepAliveDuration(), TimeUnit.MILLISECONDS));
// Timeouts
okHttpClientBuilder.connectTimeout(clientConfig.getConnectTimeout(), TimeUnit.MILLISECONDS);
okHttpClientBuilder.readTimeout(clientConfig.getReadTimeout(), TimeUnit.MILLISECONDS);
okHttpClientBuilder.callTimeout(clientConfig.getCallTimeout(), TimeUnit.MILLISECONDS);
// check if the ssl path is set and add the factory if so
if (StringUtils.isNotBlank(clientConfig.getKeystoreFilename())) {
try {
setSslSocketFactory(okHttpClientBuilder);
} catch (Exception e) {
throw new IllegalStateException("OkHttp TLS configuration failed", e);
}
}
return okHttpClientBuilder.build();
}
private void setSslSocketFactory(OkHttpClient.Builder okHttpClientBuilder) throws Exception {
String keystoreLocation = clientConfig.getKeystoreFilename();
String keystoreType = clientConfig.getKeystoreType();
String keystorePass = clientConfig.getKeystorePass();
assertKeystorePropertiesSet(keystoreLocation, keystorePass, keystoreType);
KeyStore keyStore;
try (FileInputStream keyStoreStream = new FileInputStream(keystoreLocation)) {
keyStore = new StandardKeyStoreBuilder()
.type(keystoreType)
.inputStream(keyStoreStream)
.password(keystorePass.toCharArray())
.build();
}
String truststoreLocation = clientConfig.getTruststoreFilename();
String truststorePass = clientConfig.getTruststorePass();
String truststoreType = clientConfig.getTruststoreType();
assertTruststorePropertiesSet(truststoreLocation, truststorePass, truststoreType);
KeyStore truststore;
try (FileInputStream trustStoreStream = new FileInputStream(truststoreLocation)) {
truststore = new StandardKeyStoreBuilder()
.type(truststoreType)
.inputStream(trustStoreStream)
.password(truststorePass.toCharArray())
.build();
}
X509TrustManager trustManager = new StandardTrustManagerBuilder().trustStore(truststore).build();
SSLContext sslContext = new StandardSslContextBuilder()
.keyStore(keyStore)
.keyPassword(keystorePass.toCharArray())
.trustStore(truststore)
.build();
SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
okHttpClientBuilder.sslSocketFactory(sslSocketFactory, trustManager);
}
private void assertKeystorePropertiesSet(String location, String password, String type) {
if (location == null || location.isEmpty()) {
throw new IllegalArgumentException(clientConfig.getKeystoreFilename() + " is null or is empty");
}
if (password == null || password.isEmpty()) {
throw new IllegalArgumentException("The client's keystore filename is set but its password is not (or is empty). If the location is set, the password must also be.");
}
if (type == null || type.isEmpty()) {
throw new IllegalArgumentException("The client's keystore filename is set but its type is not (or is empty). If the location is set, the type must also be.");
}
}
private void assertTruststorePropertiesSet(String location, String password, String type) {
if (location == null || location.isEmpty()) {
throw new IllegalArgumentException("The client's truststore filename is not set or is empty");
}
if (password == null || password.isEmpty()) {
throw new IllegalArgumentException("The client's truststore filename is set but its password is not (or is empty). If the location is set, the password must also be.");
}
if (type == null || type.isEmpty()) {
throw new IllegalArgumentException("The client's truststore filename is set but its type is not (or is empty). If the location is set, the type must also be.");
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.http.url;
import java.util.Optional;
public interface C2UrlProvider {
/**
* Retrieves the url of the C2 server to send heartbeats to
*
* @return the url of the C2 server to send heartbeats to
*/
String getHeartbeatUrl();
/**
* Retrieves the url of the C2 server to send acknowledgements to
*
* @return the url of the C2 server to send acknowledgements to
*/
String getAcknowledgeUrl();
/**
* Retrieves the callback url of the C2 server according to the C2 configuration (proxy aware or not)
*
* @param absoluteUrl absolute url sent by the C2 server
* @param relativeUrl relative url sent by the C2 server
* @return the url of the C2 server to send requests to
*/
Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl);
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.http.url;
import static org.apache.commons.lang3.StringUtils.isNoneBlank;
import org.apache.nifi.c2.client.C2ClientConfig;
public class C2UrlProviderFactory {
private static final String INCORRECT_SETTINGS_ERROR_MESSAGE = "Incorrect configuration. Please revisit C2 URL properties." +
"Either c2.rest.url and c2.rest.url.ack have to be set," +
"either c2.rest.path.base, c2.rest.path.heartbeat and c2.rest.path.acknowledge have to configured";
private final C2ClientConfig clientConfig;
public C2UrlProviderFactory(C2ClientConfig clientConfig) {
this.clientConfig = clientConfig;
}
public C2UrlProvider create() {
if (isNoneBlank(clientConfig.getC2RestPathBase(), clientConfig.getC2RestPathHeartbeat(), clientConfig.getC2RestPathAcknowledge())) {
return new ProxyAwareC2UrlProvider(clientConfig.getC2RestPathBase(), clientConfig.getC2RestPathHeartbeat(), clientConfig.getC2RestPathAcknowledge());
} else if (isNoneBlank(clientConfig.getC2Url(), clientConfig.getC2AckUrl())) {
return new LegacyC2UrlProvider(clientConfig.getC2Url(), clientConfig.getC2AckUrl());
} else {
throw new IllegalArgumentException(INCORRECT_SETTINGS_ERROR_MESSAGE);
}
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.http.url;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LegacyC2UrlProvider implements C2UrlProvider {
private static final Logger LOG = LoggerFactory.getLogger(LegacyC2UrlProvider.class);
private final String c2Url;
private final String c2AckUrl;
LegacyC2UrlProvider(String c2Url, String c2AckUrl) {
this.c2Url = c2Url;
this.c2AckUrl = c2AckUrl;
}
@Override
public String getHeartbeatUrl() {
return c2Url;
}
@Override
public String getAcknowledgeUrl() {
return c2AckUrl;
}
@Override
public Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl) {
Optional<String> url = Optional.ofNullable(absoluteUrl).filter(StringUtils::isNotBlank);
if (!url.isPresent()) {
LOG.error("Provided absolute url was empty or null. Relative urls are not supported with this configuration");
}
return url;
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.http.url;
import static org.apache.commons.lang3.StringUtils.appendIfMissing;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.stripStart;
import java.util.Optional;
import okhttp3.HttpUrl;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProxyAwareC2UrlProvider implements C2UrlProvider {
private static final Logger LOG = LoggerFactory.getLogger(ProxyAwareC2UrlProvider.class);
private static final String SLASH = "/";
private final HttpUrl c2RestPathBase;
private final String c2RestPathHeartbeat;
private final String c2RestPathAcknowledge;
ProxyAwareC2UrlProvider(String c2RestPathBase, String c2RestPathHeartbeat, String c2RestPathAcknowledge) {
this.c2RestPathBase = Optional.ofNullable(c2RestPathBase)
.filter(StringUtils::isNotBlank)
.map(apiBase -> appendIfMissing(apiBase, SLASH)) // trailing slash needs to be added for proper URL creation
.map(HttpUrl::parse)
.orElseThrow(() -> new IllegalArgumentException("Parameter c2RestPathBase should not be null or empty and should be a valid URL"));
this.c2RestPathHeartbeat = toAbsoluteUrl(c2RestPathHeartbeat)
.orElseThrow(() -> new IllegalArgumentException("Unable to convert c2RestPathHeartbeat to absolute url. Please check C2 configuration"));
this.c2RestPathAcknowledge = toAbsoluteUrl(c2RestPathAcknowledge)
.orElseThrow(() -> new IllegalArgumentException("Unable to convert c2RestPathAcknowledge to absolute url. Please check C2 configuration"));
}
@Override
public String getHeartbeatUrl() {
return c2RestPathHeartbeat;
}
@Override
public String getAcknowledgeUrl() {
return c2RestPathAcknowledge;
}
@Override
public Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl) {
return Optional.ofNullable(relativeUrl)
.map(this::toAbsoluteUrl)
.filter(Optional::isPresent)
.orElseGet(() -> Optional.ofNullable(absoluteUrl).filter(StringUtils::isNotBlank));
}
private Optional<String> toAbsoluteUrl(String path) {
if (isBlank(path)) {
LOG.error("Unable to convert to absolute url, provided path was null or empty");
return Optional.empty();
}
try {
return Optional.of(c2RestPathBase.resolve(stripStart(path, SLASH)).toString()); // leading slash needs to be removed for proper URL creation
} catch (Exception e) {
LOG.error("Unable to convert restBase=" + c2RestPathBase + " and restPath=" + path + " to absolute url", e);
return Optional.empty();
}
}
}

View File

@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;
import java.io.IOException;
@ -45,6 +46,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class C2HttpClientTest {
public static final String INCORRECT_PATH = "http://localhost/incorrectPath";
private static final String HEARTBEAT_PATH = "c2/heartbeat";
private static final String UPDATE_PATH = "c2/update";
private static final String ACK_PATH = "c2/acknowledge";
@ -59,8 +61,6 @@ public class C2HttpClientTest {
@Mock
private C2Serializer serializer;
private C2HttpClient c2HttpClient;
private MockWebServer mockWebServer;
private String baseUrl;
@ -71,7 +71,8 @@ public class C2HttpClientTest {
baseUrl = mockWebServer.url("/").newBuilder().host("localhost").build().toString();
when(c2ClientConfig.getKeepAliveDuration()).thenReturn(KEEP_ALIVE_DURATION);
when(c2ClientConfig.getMaxIdleConnections()).thenReturn(MAX_IDLE_CONNECTIONS);
c2HttpClient = new C2HttpClient(c2ClientConfig, serializer);
lenient().when(c2ClientConfig.getC2Url()).thenReturn(baseUrl + HEARTBEAT_PATH);
lenient().when(c2ClientConfig.getC2AckUrl()).thenReturn(baseUrl + ACK_PATH);
}
@AfterEach
@ -86,8 +87,8 @@ public class C2HttpClientTest {
when(serializer.serialize(any(C2Heartbeat.class))).thenReturn(Optional.of("Heartbeat"));
when(serializer.deserialize(any(), any())).thenReturn(Optional.of(hbResponse));
when(c2ClientConfig.getC2Url()).thenReturn(baseUrl + HEARTBEAT_PATH);
C2HttpClient c2HttpClient = C2HttpClient.create(c2ClientConfig, serializer);
Optional<C2HeartbeatResponse> response = c2HttpClient.publishHeartbeat(new C2Heartbeat());
assertTrue(response.isPresent());
@ -100,8 +101,10 @@ public class C2HttpClientTest {
@Test
void testPublishHeartbeatReturnEmptyInCaseOfCommunicationIssue() {
when(serializer.serialize(any(C2Heartbeat.class))).thenReturn(Optional.of("Heartbeat"));
when(c2ClientConfig.getC2Url()).thenReturn("http://localhost/incorrectPath");
when(c2ClientConfig.getC2Url()).thenReturn(INCORRECT_PATH);
when(c2ClientConfig.getC2AckUrl()).thenReturn(INCORRECT_PATH);
C2HttpClient c2HttpClient = C2HttpClient.create(c2ClientConfig, serializer);
Optional<C2HeartbeatResponse> response = c2HttpClient.publishHeartbeat(new C2Heartbeat());
assertFalse(response.isPresent());
@ -111,7 +114,7 @@ public class C2HttpClientTest {
void testConstructorThrowsExceptionForInvalidKeystoreFilenameAtInitialization() {
when(c2ClientConfig.getKeystoreFilename()).thenReturn("incorrectKeystoreFilename");
IllegalStateException exception = assertThrows(IllegalStateException.class, () -> new C2HttpClient(c2ClientConfig, serializer));
IllegalStateException exception = assertThrows(IllegalStateException.class, () -> C2HttpClient.create(c2ClientConfig, serializer));
assertTrue(exception.getMessage().contains("TLS"));
}
@ -120,6 +123,7 @@ public class C2HttpClientTest {
void testRetrieveUpdateContentReturnsEmptyWhenServerErrorResponse() throws InterruptedException {
mockWebServer.enqueue(new MockResponse().setBody("updateContent").setResponseCode(HTTP_STATUS_BAD_REQUEST));
C2HttpClient c2HttpClient = C2HttpClient.create(c2ClientConfig, serializer);
Optional<byte[]> response = c2HttpClient.retrieveUpdateContent(baseUrl + UPDATE_PATH);
assertFalse(response.isPresent());
@ -133,6 +137,7 @@ public class C2HttpClientTest {
String content = "updateContent";
mockWebServer.enqueue(new MockResponse().setBody(content).setResponseCode(HTTP_STATUS_OK));
C2HttpClient c2HttpClient = C2HttpClient.create(c2ClientConfig, serializer);
Optional<byte[]> response = c2HttpClient.retrieveUpdateContent(baseUrl + UPDATE_PATH);
assertTrue(response.isPresent());
@ -145,10 +150,10 @@ public class C2HttpClientTest {
@Test
void testAcknowledgeOperation() throws InterruptedException {
String ackContent = "ack";
when(c2ClientConfig.getC2AckUrl()).thenReturn(baseUrl + ACK_PATH);
when(serializer.serialize(any(C2OperationAck.class))).thenReturn(Optional.of(ackContent));
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_STATUS_OK));
C2HttpClient c2HttpClient = C2HttpClient.create(c2ClientConfig, serializer);
c2HttpClient.acknowledgeOperation(new C2OperationAck());
RecordedRequest request = mockWebServer.takeRequest();

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.http.url;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;
import org.apache.nifi.c2.client.C2ClientConfig;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class C2UrlProviderFactoryTest {
private static final String C2_REST_BASE = "https://host:8080/c2/api";
private static final String HEARTBEAT_PATH = "/heartbeat";
private static final String ACKNOWLEDGE_PATH = "/acknowledge";
@Mock
private C2ClientConfig clientConfig;
@InjectMocks
private C2UrlProviderFactory testC2UrlProviderFactory;
@Test
public void testProxyAwareC2UrlProviderIsCreated() {
// given
when(clientConfig.getC2RestPathBase()).thenReturn(C2_REST_BASE);
when(clientConfig.getC2RestPathHeartbeat()).thenReturn(HEARTBEAT_PATH);
when(clientConfig.getC2RestPathAcknowledge()).thenReturn(ACKNOWLEDGE_PATH);
// when
C2UrlProvider c2UrlProvider = testC2UrlProviderFactory.create();
// then
assertInstanceOf(ProxyAwareC2UrlProvider.class, c2UrlProvider);
}
@Test
public void testLegacyC2UrlProviderIsCreated() {
// given
when(clientConfig.getC2Url()).thenReturn(C2_REST_BASE + HEARTBEAT_PATH);
when(clientConfig.getC2AckUrl()).thenReturn(C2_REST_BASE + ACKNOWLEDGE_PATH);
// when
C2UrlProvider c2UrlProvider = testC2UrlProviderFactory.create();
// then
assertInstanceOf(LegacyC2UrlProvider.class, c2UrlProvider);
}
@Test
public void testProxyAwareProviderTakesPrecedenceOverLegacy() {
// given
lenient().when(clientConfig.getC2RestPathBase()).thenReturn(C2_REST_BASE);
lenient().when(clientConfig.getC2RestPathHeartbeat()).thenReturn(HEARTBEAT_PATH);
lenient().when(clientConfig.getC2RestPathAcknowledge()).thenReturn(ACKNOWLEDGE_PATH);
lenient().when(clientConfig.getC2Url()).thenReturn(C2_REST_BASE + HEARTBEAT_PATH);
lenient().when(clientConfig.getC2AckUrl()).thenReturn(C2_REST_BASE + ACKNOWLEDGE_PATH);
// when
C2UrlProvider c2UrlProvider = testC2UrlProviderFactory.create();
// then
assertInstanceOf(ProxyAwareC2UrlProvider.class, c2UrlProvider);
}
@Test
public void testInsufficientConfigurationResultsInException() {
// given
when(clientConfig.getC2RestPathBase()).thenReturn(C2_REST_BASE);
when(clientConfig.getC2Url()).thenReturn(C2_REST_BASE + HEARTBEAT_PATH);
// when + then
assertThrowsExactly(IllegalArgumentException.class, testC2UrlProviderFactory::create);
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.http.url;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.Optional;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class LegacyC2UrlProviderTest {
private static final String C2_HEARTBEAT_URL = "https://host:8080/c2/api/heartbeat";
private static final String C2_ACKNOWLEDGE_URL = "https://host:8080/c2/api/acknowledge";
@Test
public void testProviderIsCreatedAndReturnsProperHeartbeatAndAcknowledgeUrls() {
LegacyC2UrlProvider testProvider = new LegacyC2UrlProvider(C2_HEARTBEAT_URL, C2_ACKNOWLEDGE_URL);
assertEquals(C2_HEARTBEAT_URL, testProvider.getHeartbeatUrl());
assertEquals(C2_ACKNOWLEDGE_URL, testProvider.getAcknowledgeUrl());
}
@MethodSource("testCallbackUrlProvidedArguments")
@ParameterizedTest(name = "{index} => absoluteUrl={0}, relativeUrl={1}, expectedCallbackUrl={2}")
public void testCallbackUrlProvidedFor(String absoluteUrl, String relativeUrl, Optional<String> expectedCallbackUrl) {
LegacyC2UrlProvider testProvider = new LegacyC2UrlProvider(C2_HEARTBEAT_URL, C2_ACKNOWLEDGE_URL);
assertEquals(expectedCallbackUrl, testProvider.getCallbackUrl(absoluteUrl, relativeUrl));
}
private static Stream<Arguments> testCallbackUrlProvidedArguments() {
return Stream.of(
Arguments.of(null, null, Optional.empty()),
Arguments.of(null, "any_url", Optional.empty()),
Arguments.of("", "", Optional.empty()),
Arguments.of("", "any_url", Optional.empty()),
Arguments.of("http://c2/api/callback", "any_url", Optional.of("http://c2/api/callback"))
);
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.http.url;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import java.util.Optional;
import java.util.stream.Stream;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class ProxyAwareC2ProviderTest {
@MethodSource("testInsufficientProviderConstructorArguments")
@ParameterizedTest(name = "{index} => c2RestApi={0}, c2RestPathHeartbeat={1}, c2RestPathAcknowledge={2}")
public void testExceptionIsThrownWhenUrlsCanNotBeCreatedFromInputParameters(String c2RestApi, String c2RestPathHeartbeat, String c2RestPathAcknowledge) {
assertThrowsExactly(IllegalArgumentException.class, () -> new ProxyAwareC2UrlProvider(c2RestApi, c2RestPathHeartbeat, c2RestPathAcknowledge));
}
private static Stream<Arguments> testInsufficientProviderConstructorArguments() {
return Stream.of(
Arguments.of(null, null, null),
Arguments.of(null, null, ""),
Arguments.of(null, "", ""),
Arguments.of("", "", ""),
Arguments.of("", "", null),
Arguments.of("", null, null),
Arguments.of("http://c2/api", null, null),
Arguments.of("http://c2/api", "", null),
Arguments.of("http://c2/api", null, ""),
Arguments.of("http://c2/api", "", ""),
Arguments.of(null, "path1", null),
Arguments.of(null, null, "path2"),
Arguments.of(null, "path1", "path2"),
Arguments.of("invalid_url/api", "path1", "path2")
);
}
@MethodSource("testValidProviderConstructorArguments")
@ParameterizedTest(name = "{index} => c2RestApi={0}, c2RestPathHeartbeat={1}, c2RestPathAcknowledge={2}, expectedHeartbeatUrl={3}, expectedAcknowledgeUrl={4}")
public void testUrlProviderIsCreatedAndHeartbeatAndAcknowledgeUrlsAreReturnedCorrectly(String c2RestApi, String c2RestPathHeartbeat, String c2RestPathAcknowledge,
String expectedHeartbeatUrl, String expectedAcknowledgeUrl) {
ProxyAwareC2UrlProvider testProvider = new ProxyAwareC2UrlProvider(c2RestApi, c2RestPathHeartbeat, c2RestPathAcknowledge);
assertEquals(expectedHeartbeatUrl, testProvider.getHeartbeatUrl());
assertEquals(expectedAcknowledgeUrl, testProvider.getAcknowledgeUrl());
}
private static Stream<Arguments> testValidProviderConstructorArguments() {
String expectedHearbeatUrl = "http://c2/api/path1";
String expectedAckUrl = "http://c2/api/path2";
return Stream.of(
Arguments.of("http://c2/api", "path1", "path2", expectedHearbeatUrl, expectedAckUrl),
Arguments.of("http://c2/api", "/path1", "path2", expectedHearbeatUrl, expectedAckUrl),
Arguments.of("http://c2/api", "path1", "/path2", expectedHearbeatUrl, expectedAckUrl),
Arguments.of("http://c2/api", "/path1", "/path2", expectedHearbeatUrl, expectedAckUrl),
Arguments.of("http://c2/api/", "path1", "path2", expectedHearbeatUrl, expectedAckUrl),
Arguments.of("http://c2/api/", "/path1", "path2", expectedHearbeatUrl, expectedAckUrl),
Arguments.of("http://c2/api/", "path1", "/path2", expectedHearbeatUrl, expectedAckUrl),
Arguments.of("http://c2/api/", "/path1", "/path2", expectedHearbeatUrl, expectedAckUrl)
);
}
@MethodSource("testCallbackUrlProvidedArguments")
@ParameterizedTest(name = "{index} => c2RestBase={0}, absoluteUrl={1}, relativeUrl={2}, expectedCallbackUrl={3}")
public void testCallbackUrlProvidedFor(String c2RestBase, String absoluteUrl, String relativeUrl, Optional<String> expectedCallbackUrl) {
ProxyAwareC2UrlProvider testProvider = new ProxyAwareC2UrlProvider(c2RestBase, "any_path", "any_path");
assertEquals(expectedCallbackUrl, testProvider.getCallbackUrl(absoluteUrl, relativeUrl));
}
private static Stream<Arguments> testCallbackUrlProvidedArguments() {
String c2RestBaseNoTrailingSlash = "http://c2/api";
String c2RestBaseWithTrailingSlash = "http://c2/api/";
String path = "path/endpoint";
String absoluteUrl = "http://c2-other/api/path/endpoint";
return Stream.of(
Arguments.of(c2RestBaseNoTrailingSlash, null, null, Optional.empty()),
Arguments.of(c2RestBaseNoTrailingSlash, "", null, Optional.empty()),
Arguments.of(c2RestBaseNoTrailingSlash, null, "", Optional.empty()),
Arguments.of(c2RestBaseNoTrailingSlash, "", "", Optional.empty()),
Arguments.of(c2RestBaseWithTrailingSlash, null, null, Optional.empty()),
Arguments.of(c2RestBaseWithTrailingSlash, "", null, Optional.empty()),
Arguments.of(c2RestBaseWithTrailingSlash, null, "", Optional.empty()),
Arguments.of(c2RestBaseWithTrailingSlash, "", "", Optional.empty()),
Arguments.of(c2RestBaseNoTrailingSlash, null, path, Optional.of(c2RestBaseWithTrailingSlash + path)),
Arguments.of(c2RestBaseNoTrailingSlash, "", "/" + path, Optional.of(c2RestBaseWithTrailingSlash + path)),
Arguments.of(c2RestBaseWithTrailingSlash, null, path, Optional.of(c2RestBaseWithTrailingSlash + path)),
Arguments.of(c2RestBaseWithTrailingSlash, "", "/" + path, Optional.of(c2RestBaseWithTrailingSlash + path)),
Arguments.of(c2RestBaseWithTrailingSlash, absoluteUrl, null, Optional.of(absoluteUrl)),
Arguments.of(c2RestBaseWithTrailingSlash, absoluteUrl, "", Optional.of(absoluteUrl))
);
}
}

View File

@ -67,6 +67,7 @@ public class TransferDebugOperationHandler implements C2OperationHandler {
private static final String UNABLE_TO_CREATE_BUNDLE = "Unable to create debug bundle";
static final String TARGET_ARG = "target";
static final String RELATIVE_TARGET_ARG = "relativeTarget";
static final String NEW_LINE = "\n";
private final C2Client c2Client;
@ -116,9 +117,10 @@ public class TransferDebugOperationHandler implements C2OperationHandler {
@Override
public C2OperationAck handle(C2Operation operation) {
String debugCallbackUrl = ofNullable(operation.getArgs()).orElse(emptyMap()).get(TARGET_ARG);
if (debugCallbackUrl == null) {
LOG.error("Callback URL was not found in C2 request.");
Map<String, String> arguments = ofNullable(operation.getArgs()).orElse(emptyMap());
Optional<String> callbackUrl = c2Client.getCallbackUrl(arguments.get(TARGET_ARG), arguments.get(RELATIVE_TARGET_ARG));
if (!callbackUrl.isPresent()) {
LOG.error("Callback URL could not be constructed from C2 request and current configuration");
return operationAck(operation, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND));
}
@ -127,7 +129,7 @@ public class TransferDebugOperationHandler implements C2OperationHandler {
try {
contentFilteredFilePaths = filterContent(operation.getIdentifier(), bundleFilePaths);
operationState = createDebugBundle(contentFilteredFilePaths)
.map(bundle -> c2Client.uploadBundle(debugCallbackUrl, bundle)
.map(bundle -> c2Client.uploadBundle(callbackUrl.get(), bundle)
.map(errorMessage -> operationState(NOT_APPLIED, errorMessage))
.orElseGet(() -> operationState(FULLY_APPLIED, SUCCESSFUL_UPLOAD)))
.orElseGet(() -> operationState(NOT_APPLIED, UNABLE_TO_CREATE_BUNDLE));

View File

@ -28,6 +28,7 @@ import static org.apache.nifi.c2.protocol.api.OperandType.ASSET;
import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import org.apache.nifi.c2.client.api.C2Client;
@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory;
public class UpdateAssetOperationHandler implements C2OperationHandler {
static final String ASSET_URL_KEY = "url";
static final String ASSET_RELATIVE_URL_KEY = "relativeUrl";
static final String ASSET_FILE_KEY = "file";
static final String ASSET_FORCE_DOWNLOAD_KEY = "forceDownload";
@ -55,14 +57,14 @@ public class UpdateAssetOperationHandler implements C2OperationHandler {
private static final Logger LOG = LoggerFactory.getLogger(UpdateAssetOperationHandler.class);
private final C2Client client;
private final C2Client c2Client;
private final OperandPropertiesProvider operandPropertiesProvider;
private final BiPredicate<String, Boolean> assetUpdatePrecondition;
private final BiFunction<String, byte[], Boolean> assetPersistFunction;
public UpdateAssetOperationHandler(C2Client client, OperandPropertiesProvider operandPropertiesProvider,
public UpdateAssetOperationHandler(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider,
BiPredicate<String, Boolean> assetUpdatePrecondition, BiFunction<String, byte[], Boolean> assetPersistFunction) {
this.client = client;
this.c2Client = c2Client;
this.operandPropertiesProvider = operandPropertiesProvider;
this.assetUpdatePrecondition = assetUpdatePrecondition;
this.assetPersistFunction = assetPersistFunction;
@ -105,11 +107,12 @@ public class UpdateAssetOperationHandler implements C2OperationHandler {
public C2OperationAck handle(C2Operation operation) {
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
String assetUrl = getOperationArg(operation, ASSET_URL_KEY);
if (assetUrl == null) {
LOG.error("Callback URL with key={} was not found in C2 request. C2 request arguments={}", ASSET_URL_KEY, operation.getArgs());
Optional<String> callbackUrl = c2Client.getCallbackUrl(getOperationArg(operation, ASSET_URL_KEY), getOperationArg(operation, ASSET_RELATIVE_URL_KEY));
if (!callbackUrl.isPresent()) {
LOG.error("Callback URL could not be constructed from C2 request and current configuration");
return operationAck(operationId, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND));
}
String assetFileName = getOperationArg(operation, ASSET_FILE_KEY);
if (assetFileName == null) {
LOG.error("Asset file name with key={} was not found in C2 request. C2 request arguments={}", ASSET_FILE_KEY, operation.getArgs());
@ -117,14 +120,14 @@ public class UpdateAssetOperationHandler implements C2OperationHandler {
}
boolean forceDownload = parseBoolean(getOperationArg(operation, ASSET_FORCE_DOWNLOAD_KEY));
LOG.info("Initiating asset update from url {} with name {}, force update is {}", assetUrl, assetFileName, forceDownload);
LOG.info("Initiating asset update from url {} with name {}, force update is {}", callbackUrl, assetFileName, forceDownload);
C2OperationState operationState = assetUpdatePrecondition.test(assetFileName, forceDownload)
? client.retrieveUpdateContent(assetUrl)
.map(content -> assetPersistFunction.apply(assetFileName, content)
? operationState(FULLY_APPLIED, SUCCESSFULLY_UPDATE_ASSET)
: operationState(NOT_APPLIED, FAILED_TO_PERSIST_ASSET_TO_DISK))
.orElseGet(() -> operationState(NOT_APPLIED, UPDATE_ASSET_RETRIEVAL_RESULTED_IN_EMPTY_CONTENT))
? c2Client.retrieveUpdateContent(callbackUrl.get())
.map(content -> assetPersistFunction.apply(assetFileName, content)
? operationState(FULLY_APPLIED, SUCCESSFULLY_UPDATE_ASSET)
: operationState(NOT_APPLIED, FAILED_TO_PERSIST_ASSET_TO_DISK))
.orElseGet(() -> operationState(NOT_APPLIED, UPDATE_ASSET_RETRIEVAL_RESULTED_IN_EMPTY_CONTENT))
: operationState(NO_OPERATION, UPDATE_ASSET_PRECONDITIONS_WERE_NOT_MET);
return operationAck(operationId, operationState);

View File

@ -14,9 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.service.operation;
import static java.util.Collections.emptyMap;
import static java.util.Optional.ofNullable;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
import static org.apache.nifi.c2.protocol.api.OperandType.CONFIGURATION;
import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
@ -41,6 +47,8 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler {
private static final Pattern FLOW_ID_PATTERN = Pattern.compile("/[^/]+?/[^/]+?/[^/]+?/([^/]+)?/?.*");
static final String FLOW_ID = "flowId";
static final String LOCATION = "location";
public static final String FLOW_URL_KEY = "flowUrl";
public static final String FLOW_RELATIVE_URL_KEY = "relativeFlowUrl";
private final C2Client client;
private final Function<byte[], Boolean> updateFlow;
@ -48,7 +56,7 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler {
private final OperandPropertiesProvider operandPropertiesProvider;
public UpdateConfigurationOperationHandler(C2Client client, FlowIdHolder flowIdHolder, Function<byte[], Boolean> updateFlow,
OperandPropertiesProvider operandPropertiesProvider) {
OperandPropertiesProvider operandPropertiesProvider) {
this.client = client;
this.updateFlow = updateFlow;
this.flowIdHolder = flowIdHolder;
@ -65,73 +73,6 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler {
return CONFIGURATION;
}
@Override
public C2OperationAck handle(C2Operation operation) {
String opIdentifier = Optional.ofNullable(operation.getIdentifier())
.orElse(EMPTY);
C2OperationAck operationAck = new C2OperationAck();
C2OperationState state = new C2OperationState();
operationAck.setOperationState(state);
operationAck.setOperationId(opIdentifier);
String updateLocation = Optional.ofNullable(operation.getArgs())
.map(map -> map.get(LOCATION))
.orElse(EMPTY);
String flowId = getFlowId(operation.getArgs(), updateLocation);
if (flowId == null) {
state.setState(C2OperationState.OperationState.NOT_APPLIED);
state.setDetails("Could not get flowId from the operation.");
logger.info("FlowId is missing, no update will be performed.");
} else {
if (flowIdHolder.getFlowId() == null || !flowIdHolder.getFlowId().equals(flowId)) {
logger.info("Will perform flow update from {} for operation #{}. Previous flow id was {}, replacing with new id {}", updateLocation, opIdentifier,
flowIdHolder.getFlowId() == null ? "not set" : flowIdHolder.getFlowId(), flowId);
} else {
logger.info("Flow is current, no update is necessary...");
}
flowIdHolder.setFlowId(flowId);
state.setState(updateFlow(opIdentifier, updateLocation));
}
return operationAck;
}
private C2OperationState.OperationState updateFlow(String opIdentifier, String updateLocation) {
Optional<byte[]> updateContent = client.retrieveUpdateContent(updateLocation);
if (updateContent.isPresent()) {
if (updateFlow.apply(updateContent.get())) {
logger.debug("Update configuration applied for operation #{}.", opIdentifier);
return C2OperationState.OperationState.FULLY_APPLIED;
} else {
logger.error("Update resulted in error for operation #{}.", opIdentifier);
return C2OperationState.OperationState.NOT_APPLIED;
}
} else {
logger.error("Update content retrieval resulted in empty content so flow update was omitted for operation #{}.", opIdentifier);
return C2OperationState.OperationState.NOT_APPLIED;
}
}
private String getFlowId(Map<String, String> args, String updateLocation) {
return Optional.ofNullable(args)
.map(map -> map.get(FLOW_ID))
.orElseGet(() -> parseFlowId(updateLocation));
}
private String parseFlowId(String flowUpdateUrl) {
try {
URI flowUri = new URI(flowUpdateUrl);
Matcher matcher = FLOW_ID_PATTERN.matcher(flowUri.getPath());
if (matcher.matches()) {
return matcher.group(1);
}
} catch (Exception e) {
logger.error("Could not get flow id from the provided URL, flow update URL format unexpected [{}]", flowUpdateUrl);
}
return null;
}
@Override
public Map<String, Object> getProperties() {
return operandPropertiesProvider.getProperties();
@ -141,4 +82,84 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler {
public boolean requiresRestart() {
return true;
}
@Override
public C2OperationAck handle(C2Operation operation) {
String operationId = Optional.ofNullable(operation.getIdentifier()).orElse(EMPTY);
Map<String, String> arguments = ofNullable(operation.getArgs()).orElse(emptyMap());
String absoluteFlowUrl = ofNullable(arguments.get(FLOW_URL_KEY)).orElse(arguments.get(LOCATION));
Optional<String> callbackUrl = client.getCallbackUrl(absoluteFlowUrl, arguments.get(FLOW_RELATIVE_URL_KEY));
if (!callbackUrl.isPresent()) {
logger.error("Callback URL could not be constructed from C2 request and current configuration");
return operationAck(operationId, operationState(NOT_APPLIED, "Could not get callback url from operation and current configuration"));
}
String flowId = getFlowId(operation.getArgs(), callbackUrl.get());
if (flowId == null) {
logger.error("FlowId is missing, no update will be performed");
return operationAck(operationId, operationState(NOT_APPLIED, "Could not get flowId from the operation"));
}
if (flowIdHolder.getFlowId() != null && flowIdHolder.getFlowId().equals(flowId)) {
logger.info("Flow is current, no update is necessary");
return operationAck(operationId, operationState(NO_OPERATION, "Flow is current, no update is necessary"));
}
logger.info("Will perform flow update from {} for operation #{}. Previous flow id was {}, replacing with new id {}",
callbackUrl, operationId, ofNullable(flowIdHolder.getFlowId()).orElse("not set"), flowId);
flowIdHolder.setFlowId(flowId);
return operationAck(operationId, updateFlow(operationId, callbackUrl.get()));
}
private C2OperationState updateFlow(String opIdentifier, String callbackUrl) {
Optional<byte[]> updateContent = client.retrieveUpdateContent(callbackUrl);
if (!updateContent.isPresent()) {
logger.error("Update content retrieval resulted in empty content so flow update was omitted for operation #{}.", opIdentifier);
return operationState(NOT_APPLIED, "Update content retrieval resulted in empty content");
}
if (!updateFlow.apply(updateContent.get())) {
logger.error("Update resulted in error for operation #{}.", opIdentifier);
return operationState(NOT_APPLIED, "Update resulted in error");
}
logger.debug("Update configuration applied for operation #{}.", opIdentifier);
return operationState(FULLY_APPLIED, "Update configuration applied successfully");
}
private String getFlowId(Map<String, String> args, String callbackUrl) {
return Optional.ofNullable(args)
.map(map -> map.get(FLOW_ID))
.orElseGet(() -> parseFlowId(callbackUrl));
}
private String parseFlowId(String callbackUrl) {
try {
URI flowUri = new URI(callbackUrl);
Matcher matcher = FLOW_ID_PATTERN.matcher(flowUri.getPath());
if (matcher.matches()) {
return matcher.group(1);
}
} catch (Exception e) {
logger.error("Could not get flow id from the provided URL, flow update URL format unexpected [{}]", callbackUrl);
}
return null;
}
private C2OperationState operationState(C2OperationState.OperationState operationState, String details) {
C2OperationState state = new C2OperationState();
state.setState(operationState);
state.setDetails(details);
return state;
}
private C2OperationAck operationAck(String operationId, C2OperationState operationState) {
C2OperationAck operationAck = new C2OperationAck();
operationAck.setOperationState(operationState);
operationAck.setOperationId(operationId);
return operationAck;
}
}

View File

@ -35,8 +35,10 @@ import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
@ -49,6 +51,7 @@ import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@ -136,6 +139,7 @@ public class TransferDebugOperationHandlerTest {
.collect(toList());
TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, createBundleFiles, DEFAULT_CONTENT_FILTER);
C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT);
when(c2Client.getCallbackUrl(any(), any())).thenReturn(Optional.of(C2_DEBUG_UPLOAD_ENDPOINT));
// when
C2OperationAck result = testHandler.handle(c2Operation);
@ -196,6 +200,7 @@ public class TransferDebugOperationHandlerTest {
Predicate<String> testContentFilter = content -> !content.contains(filterKeyword);
TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, singletonList(bundleFile), testContentFilter);
C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT);
when(c2Client.getCallbackUrl(any(), any())).thenReturn(Optional.of(C2_DEBUG_UPLOAD_ENDPOINT));
// when
C2OperationAck result = testHandler.handle(c2Operation);

View File

@ -37,6 +37,8 @@ import static org.apache.nifi.c2.protocol.api.OperandType.ASSET;
import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -49,6 +51,7 @@ import java.util.stream.Stream;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
@ -89,6 +92,11 @@ public class UpdateAssetOperationHandlerTest {
Arguments.of(mock(C2Client.class), mock(OperandPropertiesProvider.class), mock(BiPredicate.class), null));
}
@BeforeEach
public void setup() {
lenient().when(c2Client.getCallbackUrl(any(), any())).thenReturn(Optional.of(ASSET_URL));
}
@ParameterizedTest(name = "c2Client={0} operandPropertiesProvider={1} bundleFileList={2} contentFilter={3}")
@MethodSource("invalidConstructorArguments")
public void testAttemptingCreateWithInvalidParametersWillThrowException(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider,
@ -107,6 +115,7 @@ public class UpdateAssetOperationHandlerTest {
public void testAssetUrlCanNotBeNull() {
// given
C2Operation operation = operation(null, ASSET_FILE_NAME, FORCE_DOWNLOAD);
when(c2Client.getCallbackUrl(any(), any())).thenReturn(Optional.empty());
// when
C2OperationAck result = testHandler.handle(operation);

View File

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.service.operation;
import static org.apache.commons.lang3.StringUtils.EMPTY;
@ -21,6 +22,8 @@ import static org.apache.nifi.c2.client.service.operation.UpdateConfigurationOpe
import static org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler.LOCATION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Collections;
@ -43,15 +46,17 @@ import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class UpdateConfigurationOperationHandlerTest {
private static final String OPERATION_ID = "operationId";
private static final Map<String, String> CORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, "/path/for/the/" + FLOW_ID);
private static final Map<String, String> INCORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, "incorrect/location");
private static final String CORRECT_LOCATION = "/path/for/the/" + FLOW_ID;
private static final String INCORRECT_LOCATION = "incorrect/location";
@Mock
private C2Client client;
private static final Map<String, String> CORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, CORRECT_LOCATION);
private static final Map<String, String> INCORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, INCORRECT_LOCATION);
@Mock
private FlowIdHolder flowIdHolder;
@Mock
private C2Client client;
@Mock
private OperandPropertiesProvider operandPropertiesProvider;
@Test
@ -64,10 +69,12 @@ public class UpdateConfigurationOperationHandlerTest {
@Test
void testHandleIncorrectArg() {
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(null, null, null, operandPropertiesProvider);
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, null, null, operandPropertiesProvider);
C2Operation operation = new C2Operation();
operation.setArgs(INCORRECT_LOCATION_MAP);
when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(INCORRECT_LOCATION));
C2OperationAck response = handler.handle(operation);
assertEquals(C2OperationState.OperationState.NOT_APPLIED, response.getOperationState().getState());
@ -78,6 +85,7 @@ public class UpdateConfigurationOperationHandlerTest {
Function<byte[], Boolean> successUpdate = x -> true;
when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
when(client.retrieveUpdateContent(any())).thenReturn(Optional.of("content".getBytes()));
when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(INCORRECT_LOCATION));
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, successUpdate, operandPropertiesProvider);
C2Operation operation = new C2Operation();
operation.setIdentifier(OPERATION_ID);
@ -94,9 +102,9 @@ public class UpdateConfigurationOperationHandlerTest {
}
@Test
void testHandleReturnsNotAppliedWithNoContent() {
void testHandleReturnsNoOperationWithNoContent() {
when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
when(client.retrieveUpdateContent(any())).thenReturn(Optional.empty());
when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(CORRECT_LOCATION));
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, null, operandPropertiesProvider);
C2Operation operation = new C2Operation();
operation.setArgs(CORRECT_LOCATION_MAP);
@ -104,14 +112,15 @@ public class UpdateConfigurationOperationHandlerTest {
C2OperationAck response = handler.handle(operation);
assertEquals(EMPTY, response.getOperationId());
assertEquals(C2OperationState.OperationState.NOT_APPLIED, response.getOperationState().getState());
assertEquals(C2OperationState.OperationState.NO_OPERATION, response.getOperationState().getState());
}
@Test
void testHandleReturnsNotAppliedWithContentApplyIssues() {
Function<byte[], Boolean> failedToUpdate = x -> false;
when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
when(flowIdHolder.getFlowId()).thenReturn("previous_flow_id");
when(client.retrieveUpdateContent(any())).thenReturn(Optional.of("content".getBytes()));
when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(CORRECT_LOCATION));
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, failedToUpdate, operandPropertiesProvider);
C2Operation operation = new C2Operation();
operation.setIdentifier(OPERATION_ID);
@ -126,7 +135,8 @@ public class UpdateConfigurationOperationHandlerTest {
@Test
void testHandleReturnsFullyApplied() {
Function<byte[], Boolean> successUpdate = x -> true;
when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
when(flowIdHolder.getFlowId()).thenReturn("previous_flow_id");
when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(CORRECT_LOCATION));
when(client.retrieveUpdateContent(any())).thenReturn(Optional.of("content".getBytes()));
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, successUpdate, operandPropertiesProvider);
C2Operation operation = new C2Operation();
@ -135,6 +145,7 @@ public class UpdateConfigurationOperationHandlerTest {
C2OperationAck response = handler.handle(operation);
verify(flowIdHolder, times(1)).setFlowId(FLOW_ID);
assertEquals(OPERATION_ID, response.getOperationId());
assertEquals(C2OperationState.OperationState.FULLY_APPLIED, response.getOperationState().getState());
}

View File

@ -85,6 +85,9 @@ public enum MiNiFiProperties {
C2_ENABLE("c2.enable", "false", false, true, BOOLEAN_VALIDATOR),
C2_REST_URL("c2.rest.url", "", false, true, VALID),
C2_REST_URL_ACK("c2.rest.url.ack", "", false, true, VALID),
C2_REST_PATH_BASE("c2.rest.path.base", "", false, true, VALID),
C2_REST_PATH_HEARTBEAT("c2.rest.path.heartbeat", "", false, true, VALID),
C2_REST_PATH_ACKNOWLEDGE("c2.rest.path.acknowledge", "", false, true, VALID),
C2_REST_CONNECTION_TIMEOUT("c2.rest.connectionTimeout", "5 sec", false, true, TIME_PERIOD_VALIDATOR),
C2_REST_READ_TIMEOUT("c2.rest.readTimeout", "5 sec", false, true, TIME_PERIOD_VALIDATOR),
C2_REST_CALL_TIMEOUT("c2.rest.callTimeout", "10 sec", false, true, TIME_PERIOD_VALIDATOR),

View File

@ -27,8 +27,11 @@ import static org.apache.nifi.minifi.MiNiFiProperties.C2_FULL_HEARTBEAT;
import static org.apache.nifi.minifi.MiNiFiProperties.C2_KEEP_ALIVE_DURATION;
import static org.apache.nifi.minifi.MiNiFiProperties.C2_MAX_IDLE_CONNECTIONS;
import static org.apache.nifi.minifi.MiNiFiProperties.C2_REQUEST_COMPRESSION;
import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_PATH_BASE;
import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_CALL_TIMEOUT;
import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_CONNECTION_TIMEOUT;
import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_PATH_ACKNOWLEDGE;
import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_PATH_HEARTBEAT;
import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_READ_TIMEOUT;
import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_URL;
import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_URL_ACK;
@ -138,7 +141,6 @@ public class C2NifiClientService {
private final PropertiesPersister propertiesPersister;
private final ObjectMapper objectMapper;
private final long heartbeatPeriod;
public C2NifiClientService(NiFiProperties niFiProperties, FlowController flowController, BootstrapCommunicator bootstrapCommunicator) {
@ -154,7 +156,7 @@ public class C2NifiClientService {
this.heartbeatPeriod = clientConfig.getHeartbeatPeriod();
this.flowController = flowController;
C2HttpClient client = new C2HttpClient(clientConfig, new C2JacksonSerializer());
C2HttpClient client = C2HttpClient.create(clientConfig, new C2JacksonSerializer());
C2HeartbeatFactory heartbeatFactory = new C2HeartbeatFactory(clientConfig, flowIdHolder, new ManifestHashProvider());
OperandPropertiesProvider emptyOperandPropertiesProvider = new EmptyOperandPropertiesProvider();
TransferDebugCommandHelper transferDebugCommandHelper = new TransferDebugCommandHelper(niFiProperties);
@ -196,19 +198,22 @@ public class C2NifiClientService {
.maxIdleConnections(Integer.parseInt(properties.getProperty(C2_MAX_IDLE_CONNECTIONS.getKey(), C2_MAX_IDLE_CONNECTIONS.getDefaultValue())))
.keepAliveDuration((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_KEEP_ALIVE_DURATION.getKey(),
C2_KEEP_ALIVE_DURATION.getDefaultValue()), TimeUnit.MILLISECONDS))
.c2Url(properties.getProperty(C2_REST_URL.getKey(), C2_REST_URL.getDefaultValue()))
.c2RequestCompression(properties.getProperty(C2_REQUEST_COMPRESSION.getKey(), C2_REQUEST_COMPRESSION.getDefaultValue()))
.c2AssetDirectory(properties.getProperty(C2_ASSET_DIRECTORY.getKey(), C2_ASSET_DIRECTORY.getDefaultValue()))
.confDirectory(properties.getProperty(C2_CONFIG_DIRECTORY.getKey(), C2_CONFIG_DIRECTORY.getDefaultValue()))
.runtimeManifestIdentifier(properties.getProperty(C2_RUNTIME_MANIFEST_IDENTIFIER.getKey(), C2_RUNTIME_MANIFEST_IDENTIFIER.getDefaultValue()))
.runtimeType(properties.getProperty(C2_RUNTIME_TYPE.getKey(), C2_RUNTIME_TYPE.getDefaultValue()))
.c2AckUrl(properties.getProperty(C2_REST_URL_ACK.getKey(), C2_REST_URL_ACK.getDefaultValue()))
.truststoreFilename(properties.getProperty(C2_SECURITY_TRUSTSTORE_LOCATION.getKey(), C2_SECURITY_TRUSTSTORE_LOCATION.getDefaultValue()))
.truststorePassword(properties.getProperty(C2_SECURITY_TRUSTSTORE_PASSWORD.getKey(), C2_SECURITY_TRUSTSTORE_PASSWORD.getDefaultValue()))
.truststoreType(properties.getProperty(C2_SECURITY_TRUSTSTORE_TYPE.getKey(), C2_SECURITY_TRUSTSTORE_TYPE.getDefaultValue()))
.keystoreFilename(properties.getProperty(C2_SECURITY_KEYSTORE_LOCATION.getKey(), C2_SECURITY_KEYSTORE_LOCATION.getDefaultValue()))
.keystorePassword(properties.getProperty(C2_SECURITY_KEYSTORE_PASSWORD.getKey(), C2_SECURITY_KEYSTORE_PASSWORD.getDefaultValue()))
.keystoreType(properties.getProperty(C2_SECURITY_KEYSTORE_TYPE.getKey(), C2_SECURITY_KEYSTORE_TYPE.getDefaultValue()))
.c2Url(properties.getProperty(C2_REST_URL.getKey(), C2_REST_URL.getDefaultValue()))
.c2AckUrl(properties.getProperty(C2_REST_URL_ACK.getKey(), C2_REST_URL_ACK.getDefaultValue()))
.c2RestPathBase(properties.getProperty(C2_REST_PATH_BASE.getKey(), C2_REST_PATH_BASE.getDefaultValue()))
.c2RestPathHeartbeat(properties.getProperty(C2_REST_PATH_HEARTBEAT.getKey(), C2_REST_PATH_HEARTBEAT.getDefaultValue()))
.c2RestPathAcknowledge(properties.getProperty(C2_REST_PATH_ACKNOWLEDGE.getKey(), C2_REST_PATH_ACKNOWLEDGE.getDefaultValue()))
.build();
}

View File

@ -134,8 +134,18 @@ java.arg.14=-Djava.awt.headless=true
# Enabling C2 Uncomment each of the following options
#c2.enable=true
## define protocol parameters
# DEPRECATED: c2.rest.url and c2.rest.url.ack are deprecated in favor of c2.rest.path.* properties and are target to be removed in future release
# The absolute url of the C2 server's heartbeat endpoint, eg.: http://localhost/c2-server/api/heartbeat
#c2.rest.url=
# The absolute url of the C2 server's acknowledge endpoint, eg.: http://localhost/c2-server/api/acknowledge
#c2.rest.url.ack=
# C2 Rest Path Properties
# The base path of the C2 server's REST API, eg.: http://localhost/c2-server/api
#c2.rest.path.base=
# Relative url of the C2 server's heartbeat endpoint, eg.: /heartbeat
#c2.rest.path.heartbeat=
# Relative url of the C2 server's acknowledge endpoint, eg.: /acknowledge
#c2.rest.path.acknowledge=
## c2 timeouts
#c2.rest.connectionTimeout=5 sec
#c2.rest.readTimeout=5 sec