mirror of https://github.com/apache/nifi.git
NIFI-9908 C2 refactor and test coverage improvements
This closes #6149 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
db11961026
commit
1465c2c629
|
@ -47,5 +47,11 @@ limitations under the License.
|
|||
<groupId>com.squareup.okhttp3</groupId>
|
||||
<artifactId>logging-interceptor</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.squareup.okhttp3</groupId>
|
||||
<artifactId>mockwebserver</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -91,6 +91,44 @@ public class C2HttpClient implements C2Client {
|
|||
return serializer.serialize(heartbeat).flatMap(this::sendHeartbeat);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<byte[]> retrieveUpdateContent(String flowUpdateUrl) {
|
||||
Optional<byte[]> updateContent = Optional.empty();
|
||||
final Request.Builder requestBuilder = new Request.Builder()
|
||||
.get()
|
||||
.url(flowUpdateUrl);
|
||||
final Request request = requestBuilder.build();
|
||||
|
||||
try (Response response = httpClientReference.get().newCall(request).execute()) {
|
||||
Optional<ResponseBody> body = Optional.ofNullable(response.body());
|
||||
|
||||
if (!response.isSuccessful()) {
|
||||
StringBuilder messageBuilder = new StringBuilder(String.format("Configuration retrieval failed: HTTP %d", response.code()));
|
||||
body.map(Object::toString).ifPresent(messageBuilder::append);
|
||||
throw new C2ServerException(messageBuilder.toString());
|
||||
}
|
||||
|
||||
if (body.isPresent()) {
|
||||
updateContent = Optional.of(body.get().bytes());
|
||||
} else {
|
||||
logger.warn("No body returned when pulling a new configuration");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("Configuration retrieval failed", e);
|
||||
}
|
||||
|
||||
return updateContent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void acknowledgeOperation(C2OperationAck operationAck) {
|
||||
logger.info("Acknowledging Operation [{}] C2 URL [{}]", operationAck.getOperationId(), clientConfig.getC2AckUrl());
|
||||
serializer.serialize(operationAck)
|
||||
.map(operationAckBody -> RequestBody.create(operationAckBody, MEDIA_TYPE_APPLICATION_JSON))
|
||||
.map(requestBody -> new Request.Builder().post(requestBody).url(clientConfig.getC2AckUrl()).build())
|
||||
.ifPresent(this::sendAck);
|
||||
}
|
||||
|
||||
private Optional<C2HeartbeatResponse> sendHeartbeat(String heartbeat) {
|
||||
Optional<C2HeartbeatResponse> c2HeartbeatResponse = Optional.empty();
|
||||
Request request = new Request.Builder()
|
||||
|
@ -198,44 +236,6 @@ public class C2HttpClient implements C2Client {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<byte[]> retrieveUpdateContent(String flowUpdateUrl) {
|
||||
final Request.Builder requestBuilder = new Request.Builder()
|
||||
.get()
|
||||
.url(flowUpdateUrl);
|
||||
final Request request = requestBuilder.build();
|
||||
|
||||
ResponseBody body;
|
||||
try (final Response response = httpClientReference.get().newCall(request).execute()) {
|
||||
int code = response.code();
|
||||
if (code >= 400) {
|
||||
final String message = String.format("Configuration retrieval failed: HTTP %d %s", code, response.body().string());
|
||||
throw new IOException(message);
|
||||
}
|
||||
|
||||
body = response.body();
|
||||
|
||||
if (body == null) {
|
||||
logger.warn("No body returned when pulling a new configuration");
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
return Optional.of(body.bytes());
|
||||
} catch (Exception e) {
|
||||
logger.warn("Configuration retrieval failed", e);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void acknowledgeOperation(C2OperationAck operationAck) {
|
||||
logger.info("Performing acknowledgement request to {} for operation {}", clientConfig.getC2AckUrl(), operationAck.getOperationId());
|
||||
serializer.serialize(operationAck)
|
||||
.map(operationAckBody -> RequestBody.create(operationAckBody, MEDIA_TYPE_APPLICATION_JSON))
|
||||
.map(requestBody -> new Request.Builder().post(requestBody).url(clientConfig.getC2AckUrl()).build())
|
||||
.ifPresent(this::sendAck);
|
||||
}
|
||||
|
||||
private void sendAck(Request request) {
|
||||
try(Response heartbeatResponse = httpClientReference.get().newCall(request).execute()) {
|
||||
if (!heartbeatResponse.isSuccessful()) {
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.c2.client.http;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Exception to differentiate C2 specific issues from standard IOExceptions
|
||||
*/
|
||||
public class C2ServerException extends IOException {
|
||||
|
||||
public C2ServerException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,156 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.c2.client.http;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Optional;
|
||||
import okhttp3.mockwebserver.MockResponse;
|
||||
import okhttp3.mockwebserver.MockWebServer;
|
||||
import okhttp3.mockwebserver.RecordedRequest;
|
||||
import org.apache.nifi.c2.client.C2ClientConfig;
|
||||
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
|
||||
import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
|
||||
import org.apache.nifi.c2.protocol.api.C2OperationAck;
|
||||
import org.apache.nifi.c2.serializer.C2Serializer;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class C2HttpClientTest {
|
||||
|
||||
private static final String HEARTBEAT_PATH = "c2/heartbeat";
|
||||
private static final String UPDATE_PATH = "c2/update";
|
||||
private static final String ACK_PATH = "c2/acknowledge";
|
||||
private static final int HTTP_STATUS_OK = 200;
|
||||
private static final int HTTP_STATUS_BAD_REQUEST = 400;
|
||||
|
||||
@Mock
|
||||
private C2ClientConfig c2ClientConfig;
|
||||
|
||||
@Mock
|
||||
private C2Serializer serializer;
|
||||
|
||||
@InjectMocks
|
||||
private C2HttpClient c2HttpClient;
|
||||
|
||||
private MockWebServer mockWebServer;
|
||||
|
||||
private String baseUrl;
|
||||
|
||||
@BeforeEach
|
||||
public void startServer() {
|
||||
mockWebServer = new MockWebServer();
|
||||
baseUrl = mockWebServer.url("/").newBuilder().host("localhost").build().toString();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void shutdownServer() throws IOException {
|
||||
mockWebServer.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testPublishHeartbeatSuccess() throws InterruptedException {
|
||||
C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
|
||||
mockWebServer.enqueue(new MockResponse().setBody("responseBody"));
|
||||
|
||||
when(serializer.serialize(any(C2Heartbeat.class))).thenReturn(Optional.of("Heartbeat"));
|
||||
when(serializer.deserialize(any(), any())).thenReturn(Optional.of(hbResponse));
|
||||
when(c2ClientConfig.getC2Url()).thenReturn(baseUrl + HEARTBEAT_PATH);
|
||||
|
||||
Optional<C2HeartbeatResponse> response = c2HttpClient.publishHeartbeat(new C2Heartbeat());
|
||||
|
||||
assertTrue(response.isPresent());
|
||||
assertEquals(response.get(), hbResponse);
|
||||
|
||||
RecordedRequest request = mockWebServer.takeRequest();
|
||||
assertEquals("/" + HEARTBEAT_PATH, request.getPath());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testPublishHeartbeatReturnEmptyInCaseOfCommunicationIssue() {
|
||||
when(serializer.serialize(any(C2Heartbeat.class))).thenReturn(Optional.of("Heartbeat"));
|
||||
when(c2ClientConfig.getC2Url()).thenReturn("http://localhost/incorrectPath");
|
||||
|
||||
Optional<C2HeartbeatResponse> response = c2HttpClient.publishHeartbeat(new C2Heartbeat());
|
||||
|
||||
assertFalse(response.isPresent());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testConstructorThrowsExceptionForInvalidKeystoreFilenameAtInitialization() {
|
||||
when(c2ClientConfig.getKeystoreFilename()).thenReturn("incorrectKeystoreFilename");
|
||||
|
||||
IllegalStateException exception = assertThrows(IllegalStateException.class, () -> new C2HttpClient(c2ClientConfig, serializer));
|
||||
|
||||
assertTrue(exception.getMessage().contains("TLS"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRetrieveUpdateContentReturnsEmptyWhenServerErrorResponse() throws InterruptedException {
|
||||
mockWebServer.enqueue(new MockResponse().setBody("updateContent").setResponseCode(HTTP_STATUS_BAD_REQUEST));
|
||||
|
||||
Optional<byte[]> response = c2HttpClient.retrieveUpdateContent(baseUrl + UPDATE_PATH);
|
||||
|
||||
assertFalse(response.isPresent());
|
||||
|
||||
RecordedRequest request = mockWebServer.takeRequest();
|
||||
assertEquals("/" + UPDATE_PATH, request.getPath());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRetrieveUpdateContentReturnsResponseWithBody() throws InterruptedException {
|
||||
String content = "updateContent";
|
||||
mockWebServer.enqueue(new MockResponse().setBody(content).setResponseCode(HTTP_STATUS_OK));
|
||||
|
||||
Optional<byte[]> response = c2HttpClient.retrieveUpdateContent(baseUrl + UPDATE_PATH);
|
||||
|
||||
assertTrue(response.isPresent());
|
||||
assertArrayEquals(content.getBytes(StandardCharsets.UTF_8), response.get());
|
||||
|
||||
RecordedRequest request = mockWebServer.takeRequest();
|
||||
assertEquals("/" + UPDATE_PATH, request.getPath());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testAcknowledgeOperation() throws InterruptedException {
|
||||
String ackContent = "ack";
|
||||
when(c2ClientConfig.getC2AckUrl()).thenReturn(baseUrl + ACK_PATH);
|
||||
when(serializer.serialize(any(C2OperationAck.class))).thenReturn(Optional.of(ackContent));
|
||||
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_STATUS_OK));
|
||||
|
||||
c2HttpClient.acknowledgeOperation(new C2OperationAck());
|
||||
|
||||
RecordedRequest request = mockWebServer.takeRequest();
|
||||
assertEquals("/" + ACK_PATH, request.getPath());
|
||||
assertTrue(request.getHeader("Content-Type").contains("application/json"));
|
||||
assertArrayEquals(ackContent.getBytes(StandardCharsets.UTF_8), request.getBody().readByteArray());
|
||||
}
|
||||
}
|
|
@ -16,13 +16,10 @@
|
|||
*/
|
||||
package org.apache.nifi.c2.client.service;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import org.apache.nifi.c2.client.api.C2Client;
|
||||
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
|
||||
import org.apache.nifi.c2.client.service.operation.C2OperationService;
|
||||
import org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler;
|
||||
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
|
||||
import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
|
||||
import org.apache.nifi.c2.protocol.api.C2Operation;
|
||||
|
@ -36,13 +33,11 @@ public class C2ClientService {
|
|||
private final C2Client client;
|
||||
private final C2HeartbeatFactory c2HeartbeatFactory;
|
||||
private final C2OperationService operationService;
|
||||
private final UpdateConfigurationOperationHandler updateConfigurationOperationHandler;
|
||||
|
||||
public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, FlowIdHolder flowIdHolder, Function<byte[], Boolean> updateFlow) {
|
||||
public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) {
|
||||
this.client = client;
|
||||
this.c2HeartbeatFactory = c2HeartbeatFactory;
|
||||
this.updateConfigurationOperationHandler = new UpdateConfigurationOperationHandler(client, flowIdHolder, updateFlow);
|
||||
this.operationService = new C2OperationService(Arrays.asList(updateConfigurationOperationHandler));
|
||||
this.operationService = operationService;
|
||||
}
|
||||
|
||||
public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
|
||||
|
|
|
@ -23,6 +23,8 @@ import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
|
|||
import java.net.URI;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.nifi.c2.client.api.C2Client;
|
||||
import org.apache.nifi.c2.client.service.FlowIdHolder;
|
||||
import org.apache.nifi.c2.protocol.api.C2Operation;
|
||||
|
@ -36,8 +38,9 @@ import org.slf4j.LoggerFactory;
|
|||
public class UpdateConfigurationOperationHandler implements C2OperationHandler {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(UpdateConfigurationOperationHandler.class);
|
||||
private static final Pattern FLOW_ID_PATTERN = Pattern.compile("/[^/]+?/[^/]+?/[^/]+?/([^/]+)?/?.*");
|
||||
|
||||
private static final String LOCATION = "location";
|
||||
static final String LOCATION = "location";
|
||||
|
||||
private final C2Client client;
|
||||
private final Function<byte[], Boolean> updateFlow;
|
||||
|
@ -101,10 +104,10 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler {
|
|||
private String parseFlowId(String flowUpdateUrl) {
|
||||
try {
|
||||
URI flowUri = new URI(flowUpdateUrl);
|
||||
String flowUriPath = flowUri.getPath();
|
||||
String[] split = flowUriPath.split("/");
|
||||
if (split.length > 4) {
|
||||
return split[4];
|
||||
Matcher matcher = FLOW_ID_PATTERN.matcher(flowUri.getPath());
|
||||
|
||||
if (matcher.matches()) {
|
||||
return matcher.group(1);
|
||||
} else {
|
||||
throw new IllegalArgumentException(String.format("Flow Update URL format unexpected [%s]", flowUpdateUrl));
|
||||
}
|
||||
|
|
|
@ -0,0 +1,146 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.c2.client.service;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import org.apache.nifi.c2.client.api.C2Client;
|
||||
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
|
||||
import org.apache.nifi.c2.client.service.operation.C2OperationService;
|
||||
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
|
||||
import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
|
||||
import org.apache.nifi.c2.protocol.api.C2Operation;
|
||||
import org.apache.nifi.c2.protocol.api.C2OperationAck;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class C2ClientServiceTest {
|
||||
|
||||
@Mock
|
||||
private C2Client client;
|
||||
|
||||
@Mock
|
||||
private C2HeartbeatFactory c2HeartbeatFactory;
|
||||
|
||||
@Mock
|
||||
private C2OperationService operationService;
|
||||
|
||||
@Mock
|
||||
private RuntimeInfoWrapper runtimeInfoWrapper;
|
||||
|
||||
@InjectMocks
|
||||
private C2ClientService c2ClientService;
|
||||
|
||||
@Test
|
||||
void testSendHeartbeatAndAckWhenOperationPresent() {
|
||||
C2Heartbeat heartbeat = mock(C2Heartbeat.class);
|
||||
when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
|
||||
C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
|
||||
hbResponse.setRequestedOperations(generateOperation(1));
|
||||
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
|
||||
when(operationService.handleOperation(any())).thenReturn(Optional.of(new C2OperationAck()));
|
||||
|
||||
c2ClientService.sendHeartbeat(runtimeInfoWrapper);
|
||||
|
||||
verify(c2HeartbeatFactory).create(any());
|
||||
verify(client).publishHeartbeat(heartbeat);
|
||||
verify(operationService).handleOperation(any());
|
||||
verify(client).acknowledgeOperation(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSendHeartbeatAndAckForMultipleOperationPresent() {
|
||||
int operationNum = 5;
|
||||
C2Heartbeat heartbeat = mock(C2Heartbeat.class);
|
||||
when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
|
||||
C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
|
||||
hbResponse.setRequestedOperations(generateOperation(operationNum));
|
||||
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
|
||||
when(operationService.handleOperation(any())).thenReturn(Optional.of(new C2OperationAck()));
|
||||
|
||||
c2ClientService.sendHeartbeat(runtimeInfoWrapper);
|
||||
|
||||
verify(c2HeartbeatFactory).create(any());
|
||||
verify(client).publishHeartbeat(heartbeat);
|
||||
verify(operationService, times(operationNum)).handleOperation(any());
|
||||
verify(client, times(operationNum)).acknowledgeOperation(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSendHeartbeatHandlesNoHeartbeatResponse() {
|
||||
C2Heartbeat heartbeat = mock(C2Heartbeat.class);
|
||||
when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
|
||||
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.empty());
|
||||
|
||||
c2ClientService.sendHeartbeat(runtimeInfoWrapper);
|
||||
|
||||
verify(c2HeartbeatFactory).create(any());
|
||||
verify(client).publishHeartbeat(heartbeat);
|
||||
verify(operationService, times(0)).handleOperation(any());
|
||||
verify(client, times(0)).acknowledgeOperation(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSendHeartbeatNotHandledWhenThereAreNoOperationsSent() {
|
||||
C2Heartbeat heartbeat = mock(C2Heartbeat.class);
|
||||
when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
|
||||
C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
|
||||
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
|
||||
|
||||
c2ClientService.sendHeartbeat(runtimeInfoWrapper);
|
||||
|
||||
verify(c2HeartbeatFactory).create(any());
|
||||
verify(client).publishHeartbeat(heartbeat);
|
||||
verify(operationService, times(0)).handleOperation(any());
|
||||
verify(client, times(0)).acknowledgeOperation(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSendHeartbeatNotAckWhenOperationAckMissing() {
|
||||
C2Heartbeat heartbeat = mock(C2Heartbeat.class);
|
||||
when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
|
||||
C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
|
||||
hbResponse.setRequestedOperations(generateOperation(1));
|
||||
when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
|
||||
when(operationService.handleOperation(any())).thenReturn(Optional.empty());
|
||||
|
||||
c2ClientService.sendHeartbeat(runtimeInfoWrapper);
|
||||
|
||||
verify(c2HeartbeatFactory).create(any());
|
||||
verify(client).publishHeartbeat(heartbeat);
|
||||
verify(operationService).handleOperation(any());
|
||||
verify(client, times(0)).acknowledgeOperation(any());
|
||||
}
|
||||
|
||||
private List<C2Operation> generateOperation(int num) {
|
||||
return IntStream.range(0, num)
|
||||
.mapToObj(x -> new C2Operation())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.c2.client.service;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.apache.nifi.c2.client.C2ClientConfig;
|
||||
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
|
||||
import org.apache.nifi.c2.protocol.api.AgentRepositories;
|
||||
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
|
||||
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
|
||||
import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class C2HeartbeatFactoryTest {
|
||||
|
||||
private static final String AGENT_CLASS = "agentClass";
|
||||
private static final String FLOW_ID = "flowId";
|
||||
|
||||
@Mock
|
||||
private C2ClientConfig clientConfig;
|
||||
|
||||
@Mock
|
||||
private FlowIdHolder flowIdHolder;
|
||||
|
||||
@InjectMocks
|
||||
private C2HeartbeatFactory c2HeartbeatFactory;
|
||||
|
||||
@TempDir
|
||||
private File tempDir;
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
when(clientConfig.getConfDirectory()).thenReturn(tempDir.getAbsolutePath());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testCreateHeartbeat() {
|
||||
when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
|
||||
when(clientConfig.getAgentClass()).thenReturn(AGENT_CLASS);
|
||||
|
||||
C2Heartbeat heartbeat = c2HeartbeatFactory.create(mock(RuntimeInfoWrapper.class));
|
||||
|
||||
assertEquals(FLOW_ID, heartbeat.getFlowId());
|
||||
assertEquals(AGENT_CLASS, heartbeat.getAgentClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testCreateGeneratesAgentAndDeviceIdIfNotPresent() {
|
||||
C2Heartbeat heartbeat = c2HeartbeatFactory.create(mock(RuntimeInfoWrapper.class));
|
||||
|
||||
assertNotNull(heartbeat.getAgentId());
|
||||
assertNotNull(heartbeat.getDeviceId());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testCreatePopulatesFromRuntimeInfoWrapper() {
|
||||
AgentRepositories repos = new AgentRepositories();
|
||||
RuntimeManifest manifest = new RuntimeManifest();
|
||||
Map<String, FlowQueueStatus> queueStatus = new HashMap<>();
|
||||
|
||||
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus));
|
||||
|
||||
assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories());
|
||||
assertEquals(manifest, heartbeat.getAgentInfo().getAgentManifest());
|
||||
assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testCreateThrowsExceptionWhenConfDirNotSet() {
|
||||
when(clientConfig.getConfDirectory()).thenReturn(String.class.getSimpleName());
|
||||
|
||||
assertThrows(IllegalStateException.class, () -> c2HeartbeatFactory.create(mock(RuntimeInfoWrapper.class)));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.c2.client.service.operation;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
import org.apache.nifi.c2.protocol.api.C2Operation;
|
||||
import org.apache.nifi.c2.protocol.api.C2OperationAck;
|
||||
import org.apache.nifi.c2.protocol.api.OperandType;
|
||||
import org.apache.nifi.c2.protocol.api.OperationType;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class C2OperationServiceTest {
|
||||
|
||||
private static C2OperationAck operationAck;
|
||||
|
||||
@BeforeAll
|
||||
public static void setup(){
|
||||
operationAck = new C2OperationAck();
|
||||
operationAck.setOperationId("12345");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testHandleOperationReturnsEmptyForUnrecognisedOperationType() {
|
||||
C2OperationService service = new C2OperationService(Collections.emptyList());
|
||||
|
||||
C2Operation operation = new C2Operation();
|
||||
operation.setOperation(OperationType.UPDATE);
|
||||
operation.setOperand(OperandType.CONFIGURATION);
|
||||
Optional<C2OperationAck> ack = service.handleOperation(operation);
|
||||
|
||||
assertFalse(ack.isPresent());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testHandleOperation() {
|
||||
C2OperationService service = new C2OperationService(Collections.singletonList(new TestDescribeOperationHandler()));
|
||||
|
||||
C2Operation operation = new C2Operation();
|
||||
operation.setOperation(OperationType.DESCRIBE);
|
||||
operation.setOperand(OperandType.MANIFEST);
|
||||
Optional<C2OperationAck> ack = service.handleOperation(operation);
|
||||
|
||||
assertTrue(ack.isPresent());
|
||||
assertEquals(operationAck, ack.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testHandleOperationReturnsEmptyForOperandMismatch() {
|
||||
C2OperationService service = new C2OperationService(Collections.singletonList(new TestInvalidOperationHandler()));
|
||||
|
||||
C2Operation operation = new C2Operation();
|
||||
operation.setOperation(OperationType.DESCRIBE);
|
||||
operation.setOperand(OperandType.MANIFEST);
|
||||
Optional<C2OperationAck> ack = service.handleOperation(operation);
|
||||
|
||||
assertFalse(ack.isPresent());
|
||||
}
|
||||
|
||||
private static class TestDescribeOperationHandler implements C2OperationHandler {
|
||||
|
||||
@Override
|
||||
public OperationType getOperationType() {
|
||||
return OperationType.DESCRIBE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OperandType getOperandType() {
|
||||
return OperandType.MANIFEST;
|
||||
}
|
||||
|
||||
@Override
|
||||
public C2OperationAck handle(C2Operation operation) {
|
||||
return operationAck;
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestInvalidOperationHandler implements C2OperationHandler {
|
||||
|
||||
@Override
|
||||
public OperationType getOperationType() {
|
||||
return OperationType.DESCRIBE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OperandType getOperandType() {
|
||||
return OperandType.CONFIGURATION;
|
||||
}
|
||||
|
||||
@Override
|
||||
public C2OperationAck handle(C2Operation operation) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,119 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.c2.client.service.operation;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.EMPTY;
|
||||
import static org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler.LOCATION;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import org.apache.nifi.c2.client.api.C2Client;
|
||||
import org.apache.nifi.c2.client.service.FlowIdHolder;
|
||||
import org.apache.nifi.c2.protocol.api.C2Operation;
|
||||
import org.apache.nifi.c2.protocol.api.C2OperationAck;
|
||||
import org.apache.nifi.c2.protocol.api.C2OperationState;
|
||||
import org.apache.nifi.c2.protocol.api.OperandType;
|
||||
import org.apache.nifi.c2.protocol.api.OperationType;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class UpdateConfigurationOperationHandlerTest {
|
||||
|
||||
private static final String FLOW_ID = "flowId";
|
||||
private static final String OPERATION_ID = "operationId";
|
||||
private static final Map<String, String> CORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, "/path/for/the/" + FLOW_ID);
|
||||
private static final Map<String, String> INCORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, "incorrect/location");
|
||||
|
||||
@Mock
|
||||
private C2Client client;
|
||||
|
||||
@Mock
|
||||
private FlowIdHolder flowIdHolder;
|
||||
|
||||
@Test
|
||||
void testUpdateConfigurationOperationHandlerCreateSuccess() {
|
||||
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(null, null, null);
|
||||
|
||||
assertEquals(OperationType.UPDATE, handler.getOperationType());
|
||||
assertEquals(OperandType.CONFIGURATION, handler.getOperandType());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testHandleThrowsExceptionForIncorrectArg() {
|
||||
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(null, null, null);
|
||||
C2Operation operation = new C2Operation();
|
||||
operation.setArgs(INCORRECT_LOCATION_MAP);
|
||||
|
||||
IllegalStateException exception = assertThrows(IllegalStateException.class, () -> handler.handle(operation));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testHandleReturnsNotAppliedWithNoContent() {
|
||||
when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
|
||||
when(client.retrieveUpdateContent(any())).thenReturn(Optional.empty());
|
||||
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, null);
|
||||
C2Operation operation = new C2Operation();
|
||||
operation.setArgs(CORRECT_LOCATION_MAP);
|
||||
|
||||
C2OperationAck response = handler.handle(operation);
|
||||
|
||||
assertEquals(EMPTY, response.getOperationId());
|
||||
assertEquals(C2OperationState.OperationState.NOT_APPLIED, response.getOperationState().getState());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testHandleReturnsNotAppliedWithContentApplyIssues() {
|
||||
Function<byte[], Boolean> failedToUpdate = x -> false;
|
||||
when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
|
||||
when(client.retrieveUpdateContent(any())).thenReturn(Optional.of("content".getBytes()));
|
||||
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, failedToUpdate);
|
||||
C2Operation operation = new C2Operation();
|
||||
operation.setIdentifier(OPERATION_ID);
|
||||
operation.setArgs(CORRECT_LOCATION_MAP);
|
||||
|
||||
C2OperationAck response = handler.handle(operation);
|
||||
|
||||
assertEquals(OPERATION_ID, response.getOperationId());
|
||||
assertEquals(C2OperationState.OperationState.NOT_APPLIED, response.getOperationState().getState());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testHandleReturnsFullyApplied() {
|
||||
Function<byte[], Boolean> successUpdate = x -> true;
|
||||
when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
|
||||
when(client.retrieveUpdateContent(any())).thenReturn(Optional.of("content".getBytes()));
|
||||
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, successUpdate);
|
||||
C2Operation operation = new C2Operation();
|
||||
operation.setIdentifier(OPERATION_ID);
|
||||
operation.setArgs(CORRECT_LOCATION_MAP);
|
||||
|
||||
C2OperationAck response = handler.handle(operation);
|
||||
|
||||
assertEquals(OPERATION_ID, response.getOperationId());
|
||||
assertEquals(C2OperationState.OperationState.FULLY_APPLIED, response.getOperationState().getState());
|
||||
}
|
||||
|
||||
}
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.Optional;
|
||||
import org.apache.nifi.c2.client.C2ClientConfig;
|
||||
import org.apache.nifi.c2.client.http.C2HttpClient;
|
||||
|
@ -28,6 +29,8 @@ import org.apache.nifi.c2.client.service.C2ClientService;
|
|||
import org.apache.nifi.c2.client.service.C2HeartbeatFactory;
|
||||
import org.apache.nifi.c2.client.service.FlowIdHolder;
|
||||
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
|
||||
import org.apache.nifi.c2.client.service.operation.C2OperationService;
|
||||
import org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler;
|
||||
import org.apache.nifi.c2.protocol.api.AgentRepositories;
|
||||
import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
|
||||
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
|
||||
|
@ -42,7 +45,6 @@ import org.apache.nifi.extension.manifest.parser.jaxb.JAXBExtensionManifestParse
|
|||
import org.apache.nifi.manifest.RuntimeManifestService;
|
||||
import org.apache.nifi.manifest.StandardRuntimeManifestService;
|
||||
import org.apache.nifi.nar.ExtensionManagerHolder;
|
||||
import org.apache.nifi.services.FlowService;
|
||||
import org.apache.nifi.util.FormatUtils;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -66,7 +68,6 @@ public class C2NifiClientService {
|
|||
|
||||
private final C2ClientService c2ClientService;
|
||||
|
||||
private final FlowService flowService;
|
||||
private final FlowController flowController;
|
||||
private final String propertiesDir;
|
||||
private final ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
|
||||
|
@ -75,7 +76,7 @@ public class C2NifiClientService {
|
|||
private final RuntimeManifestService runtimeManifestService;
|
||||
private final long heartbeatPeriod;
|
||||
|
||||
public C2NifiClientService(final NiFiProperties niFiProperties, final FlowService flowService, final FlowController flowController) {
|
||||
public C2NifiClientService(final NiFiProperties niFiProperties, final FlowController flowController) {
|
||||
C2ClientConfig clientConfig = generateClientConfig(niFiProperties);
|
||||
FlowIdHolder flowIdHolder = new FlowIdHolder(clientConfig.getConfDirectory());
|
||||
this.propertiesDir = niFiProperties.getProperty(NiFiProperties.PROPERTIES_FILE_PATH, null);
|
||||
|
@ -86,13 +87,12 @@ public class C2NifiClientService {
|
|||
clientConfig.getRuntimeType()
|
||||
);
|
||||
this.heartbeatPeriod = clientConfig.getHeartbeatPeriod();
|
||||
this.flowService = flowService;
|
||||
this.flowController = flowController;
|
||||
C2HttpClient client = new C2HttpClient(clientConfig, new C2JacksonSerializer());
|
||||
this.c2ClientService = new C2ClientService(
|
||||
new C2HttpClient(clientConfig, new C2JacksonSerializer()),
|
||||
client,
|
||||
new C2HeartbeatFactory(clientConfig, flowIdHolder),
|
||||
flowIdHolder,
|
||||
this::updateFlowContent
|
||||
new C2OperationService(Arrays.asList(new UpdateConfigurationOperationHandler(client, flowIdHolder, this::updateFlowContent)))
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -300,7 +300,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
|
|||
final boolean c2Enabled = Boolean.parseBoolean(nifiProperties.getProperty(C2NiFiProperties.C2_ENABLE_KEY, "false"));
|
||||
if (c2Enabled) {
|
||||
logger.info("C2 enabled, creating a C2 client instance");
|
||||
c2NifiClientService = new C2NifiClientService(nifiProperties, this, this.controller);
|
||||
c2NifiClientService = new C2NifiClientService(nifiProperties, this.controller);
|
||||
c2NifiClientService.start();
|
||||
} else {
|
||||
logger.info("C2 Property [{}] missing or disabled: C2 client not created", C2NiFiProperties.C2_ENABLE_KEY);
|
||||
|
|
Loading…
Reference in New Issue