From 3aa4ff9d5663c131ccf7d18bbed69b51cbd72707 Mon Sep 17 00:00:00 2001 From: Ferenc Kis Date: Wed, 15 May 2024 14:19:46 +0200 Subject: [PATCH] NIFI-13242 MiNiFi Sync Resource C2 command Signed-off-by: Ferenc Erdei This closes #8898. --- .../apache/nifi/c2/client/api/C2Client.java | 13 + .../c2/serializer/C2JacksonSerializer.java | 33 +- .../nifi/c2/serializer/C2Serializer.java | 20 +- .../apache/nifi/c2/util/Preconditions.java | 30 ++ .../nifi/c2/client/http/C2HttpClient.java | 39 +- .../nifi/c2/client/http/C2HttpClientTest.java | 2 +- .../c2/client/service/C2HeartbeatFactory.java | 12 +- .../service/operation/C2OperationHandler.java | 64 +++ .../DescribeManifestOperationHandler.java | 32 +- .../SyncResourceOperationHandler.java | 119 +++++ .../operation/SyncResourceStrategy.java | 35 ++ .../TransferDebugOperationHandler.java | 46 +- .../UpdateAssetOperationHandler.java | 61 +-- .../UpdateConfigurationOperationHandler.java | 49 +- .../UpdatePropertiesOperationHandler.java | 29 +- .../service/C2HeartbeatFactoryTest.java | 25 +- .../SyncResourceOperationHandlerTest.java | 167 +++++++ .../UpdateAssetOperationHandlerTest.java | 2 +- ...dateConfigurationOperationHandlerTest.java | 6 +- .../UpdatePropertiesOperationHandlerTest.java | 4 +- .../nifi/c2/protocol/api/C2Heartbeat.java | 11 +- .../nifi/c2/protocol/api/C2Operation.java | 6 +- .../nifi/c2/protocol/api/C2OperationAck.java | 19 +- .../nifi/c2/protocol/api/OperandType.java | 3 +- .../nifi/c2/protocol/api/OperationType.java | 2 + .../nifi/c2/protocol/api/ResourceInfo.java | 62 +++ .../nifi/c2/protocol/api/ResourceItem.java | 132 +++++ .../nifi/c2/protocol/api/ResourceType.java | 23 + .../c2/protocol/api/ResourcesGlobalHash.java | 73 +++ .../nifi/minifi/c2/C2NifiClientService.java | 23 +- .../c2/command/PropertiesPersister.java | 12 +- .../DefaultSyncResourceStrategy.java | 161 +++++++ .../syncresource/FileResourceRepository.java | 229 +++++++++ .../syncresource/ResourceRepository.java | 41 ++ .../DefaultSyncResourceStrategyTest.java | 312 ++++++++++++ .../FileResourceRepositoryTest.java | 450 ++++++++++++++++++ .../src/main/resources/conf/bootstrap.conf | 10 - 37 files changed, 2160 insertions(+), 197 deletions(-) create mode 100644 c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/util/Preconditions.java create mode 100644 c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/SyncResourceOperationHandler.java create mode 100644 c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/SyncResourceStrategy.java create mode 100644 c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/SyncResourceOperationHandlerTest.java create mode 100644 c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ResourceInfo.java create mode 100644 c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ResourceItem.java create mode 100644 c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ResourceType.java create mode 100644 c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ResourcesGlobalHash.java create mode 100644 minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/DefaultSyncResourceStrategy.java create mode 100644 minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/FileResourceRepository.java create mode 100644 minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/ResourceRepository.java create mode 100644 minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/syncresource/DefaultSyncResourceStrategyTest.java create mode 100644 minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/syncresource/FileResourceRepositoryTest.java diff --git a/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java index de90b1b0cd..ad46e2351c 100644 --- a/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java +++ b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java @@ -17,7 +17,10 @@ package org.apache.nifi.c2.client.api; +import java.io.InputStream; +import java.nio.file.Path; import java.util.Optional; +import java.util.function.Function; import org.apache.nifi.c2.protocol.api.C2Heartbeat; import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse; import org.apache.nifi.c2.protocol.api.C2OperationAck; @@ -58,6 +61,16 @@ public interface C2Client { */ Optional retrieveUpdateAssetContent(String callbackUrl); + /** + * Retrieves a resource from the C2 server. The resource is not materialized into a byte[], + * instead a consumer is provided to stream the data to a specified location + * + * @param callbackUrl url where the resource should be downloaded from + * @param resourceConsumer consumer to handle the incoming data as a stream + * @return the path of the downloaded resource. Will be empty if no content can be downloaded or an error occurred + */ + Optional retrieveResourceItem(String callbackUrl, Function> resourceConsumer); + /** * Uploads a binary bundle to C2 server * diff --git a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/serializer/C2JacksonSerializer.java b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/serializer/C2JacksonSerializer.java index 8fb81a6f9e..8347f4b0b6 100644 --- a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/serializer/C2JacksonSerializer.java +++ b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/serializer/C2JacksonSerializer.java @@ -14,15 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.nifi.c2.serializer; +import static java.util.Optional.empty; +import static java.util.Optional.ofNullable; + import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.Optional; - import com.fasterxml.jackson.databind.module.SimpleModule; +import java.util.Optional; import org.apache.nifi.c2.protocol.api.OperandType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +52,7 @@ public class C2JacksonSerializer implements C2Serializer { public Optional serialize(T object) { if (object == null) { logger.trace("C2 Object was null. Nothing to serialize. Returning empty."); - return Optional.empty(); + return empty(); } String contentString = null; @@ -58,14 +62,14 @@ public class C2JacksonSerializer implements C2Serializer { logger.error("Object serialization to JSON failed", e); } - return Optional.ofNullable(contentString); + return ofNullable(contentString); } @Override public Optional deserialize(String content, Class valueType) { if (content == null) { - logger.trace("Content for deserialization was null. Returning empty."); - return Optional.empty(); + logger.trace("Content for deserialization was null. Returning empty"); + return empty(); } T responseObject = null; @@ -75,6 +79,21 @@ public class C2JacksonSerializer implements C2Serializer { logger.error("Object deserialization from JSON failed", e); } - return Optional.ofNullable(responseObject); + return ofNullable(responseObject); + } + + @Override + public Optional convert(Object content, TypeReference valueType) { + if (content == null) { + logger.trace("Content for conversion was null. Returning empty"); + return empty(); + } + + try { + return ofNullable(objectMapper.convertValue(content, valueType)); + } catch (IllegalArgumentException e) { + logger.error("Object conversion failed", e); + return empty(); + } } } diff --git a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/serializer/C2Serializer.java b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/serializer/C2Serializer.java index c08d07d399..ce74f93437 100644 --- a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/serializer/C2Serializer.java +++ b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/serializer/C2Serializer.java @@ -14,8 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.nifi.c2.serializer; +import com.fasterxml.jackson.core.type.TypeReference; import java.util.Optional; /** @@ -27,18 +29,28 @@ public interface C2Serializer { * Helper to serialise object * * @param content object to be serialised - * @param the type of the object + * @param the type of the object * @return the serialised string representation of the parameter object if it was successful empty otherwise */ Optional serialize(T content); /** - * Helper to deserialise an object + * Helper to deserialize an object * - * @param content the string representation of the object to be deserialsed + * @param content the string representation of the object to be deserialized * @param valueType the class of the target object - * @param the type of the target object + * @param the type of the target object * @return the deserialised object if successful empty otherwise */ Optional deserialize(String content, Class valueType); + + /** + * Helper to convert a JSON object into a specific type + * + * @param content the string representation of the object to be deserialized + * @param valueType the class of the target object + * @param the type of the target object + * @return the converted object if the conversion was successful empty otherwise + */ + Optional convert(Object content, TypeReference valueType); } diff --git a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/util/Preconditions.java b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/util/Preconditions.java new file mode 100644 index 0000000000..f4140bc4a4 --- /dev/null +++ b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/util/Preconditions.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.util; + +public abstract class Preconditions { + + private Preconditions() { + } + + public static void requires(boolean criterion, String exceptionMessage) { + if (!criterion) { + throw new IllegalArgumentException(exceptionMessage); + } + } +} diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java index 5aad04d114..beff8d171a 100644 --- a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java +++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java @@ -17,12 +17,18 @@ package org.apache.nifi.c2.client.http; +import static java.lang.String.format; +import static java.util.Optional.empty; +import static java.util.Optional.ofNullable; import static okhttp3.MultipartBody.FORM; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import okhttp3.Headers; import okhttp3.MediaType; import okhttp3.MultipartBody; @@ -95,6 +101,27 @@ public class C2HttpClient implements C2Client { return retrieveContent(callbackUrl, Map.of()); } + @Override + public Optional retrieveResourceItem(String callbackUrl, Function> resourceConsumer) { + Request request = new Request.Builder() + .get() + .url(callbackUrl) + .build(); + + try (Response response = httpClientReference.get().newCall(request).execute()) { + if (!response.isSuccessful()) { + throw new C2ServerException("Resource content retrieval failed with HTTP return code " + response.code()); + } + return ofNullable(response.body()) + .map(ResponseBody::byteStream) + .map(resourceConsumer::apply) + .orElseThrow(() -> new C2ServerException("Resource content retrieval failed with empty body")); + } catch (Exception e) { + logger.warn("Resource item retrieval failed", e); + return empty(); + } + } + @Override public Optional uploadBundle(String callbackUrl, byte[] bundle) { Request request = new Request.Builder() @@ -115,7 +142,7 @@ public class C2HttpClient implements C2Client { logger.error("Could not upload bundle to C2 server {}", callbackUrl, e); return Optional.of("Could not upload bundle to C2 server"); } - return Optional.empty(); + return empty(); } @Override @@ -124,7 +151,7 @@ public class C2HttpClient implements C2Client { } private Optional sendHeartbeat(String heartbeat) { - Optional c2HeartbeatResponse = Optional.empty(); + Optional c2HeartbeatResponse = empty(); Request request = new Request.Builder() .post(RequestBody.create(heartbeat, MEDIA_TYPE_APPLICATION_JSON)) .url(c2UrlProvider.getHeartbeatUrl()) @@ -151,7 +178,7 @@ public class C2HttpClient implements C2Client { logger.error("HTTP Request failed", e); } - return Optional.ofNullable(responseBody); + return ofNullable(responseBody); } private void sendAck(Request request) { @@ -165,7 +192,7 @@ public class C2HttpClient implements C2Client { } private Optional retrieveContent(String callbackUrl, Map httpHeaders) { - Optional content = Optional.empty(); + Optional content = empty(); Request.Builder requestBuilder = new Request.Builder() .get() @@ -174,10 +201,10 @@ public class C2HttpClient implements C2Client { Request request = requestBuilder.build(); try (Response response = httpClientReference.get().newCall(request).execute()) { - Optional body = Optional.ofNullable(response.body()); + Optional body = ofNullable(response.body()); if (!response.isSuccessful()) { - StringBuilder messageBuilder = new StringBuilder(String.format("Update content retrieval failed: HTTP %d", response.code())); + StringBuilder messageBuilder = new StringBuilder(format("Update content retrieval failed: HTTP %d", response.code())); body.map(Object::toString).ifPresent(messageBuilder::append); throw new C2ServerException(messageBuilder.toString()); } diff --git a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java index d8167b5112..23bf116e8d 100644 --- a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java +++ b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java @@ -86,7 +86,7 @@ public class C2HttpClientTest { mockWebServer.enqueue(new MockResponse().setBody("responseBody")); when(serializer.serialize(any(C2Heartbeat.class))).thenReturn(Optional.of("Heartbeat")); - when(serializer.deserialize(any(), any())).thenReturn(Optional.of(hbResponse)); + when(serializer.deserialize(any(), any(Class.class))).thenReturn(Optional.of(hbResponse)); C2HttpClient c2HttpClient = C2HttpClient.create(c2ClientConfig, serializer); Optional response = c2HttpClient.publishHeartbeat(new C2Heartbeat()); diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java index b5d9e9824e..06549b5663 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java @@ -38,6 +38,7 @@ import java.util.Comparator; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Supplier; import org.apache.nifi.c2.client.C2ClientConfig; import org.apache.nifi.c2.client.PersistentUuidGenerator; import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper; @@ -46,12 +47,14 @@ import org.apache.nifi.c2.protocol.api.AgentManifest; import org.apache.nifi.c2.protocol.api.AgentRepositories; import org.apache.nifi.c2.protocol.api.AgentResourceConsumption; import org.apache.nifi.c2.protocol.api.AgentStatus; +import org.apache.nifi.c2.protocol.api.ResourceInfo; import org.apache.nifi.c2.protocol.api.C2Heartbeat; import org.apache.nifi.c2.protocol.api.DeviceInfo; import org.apache.nifi.c2.protocol.api.FlowInfo; import org.apache.nifi.c2.protocol.api.FlowQueueStatus; import org.apache.nifi.c2.protocol.api.NetworkInfo; import org.apache.nifi.c2.protocol.api.SupportedOperation; +import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash; import org.apache.nifi.c2.protocol.api.SystemInfo; import org.apache.nifi.c2.protocol.component.api.RuntimeManifest; import org.slf4j.Logger; @@ -67,15 +70,18 @@ public class C2HeartbeatFactory { private final C2ClientConfig clientConfig; private final FlowIdHolder flowIdHolder; private final ManifestHashProvider manifestHashProvider; + private final Supplier resourcesGlobalHashSupplier; private String agentId; private String deviceId; private File confDirectory; - public C2HeartbeatFactory(C2ClientConfig clientConfig, FlowIdHolder flowIdHolder, ManifestHashProvider manifestHashProvider) { + public C2HeartbeatFactory(C2ClientConfig clientConfig, FlowIdHolder flowIdHolder, ManifestHashProvider manifestHashProvider, + Supplier resourcesGlobalHashSupplier) { this.clientConfig = clientConfig; this.flowIdHolder = flowIdHolder; this.manifestHashProvider = manifestHashProvider; + this.resourcesGlobalHashSupplier = resourcesGlobalHashSupplier; } public C2Heartbeat create(RuntimeInfoWrapper runtimeInfoWrapper) { @@ -86,6 +92,10 @@ public class C2HeartbeatFactory { heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus())); heartbeat.setCreated(System.currentTimeMillis()); + ResourceInfo resourceInfo = new ResourceInfo(); + resourceInfo.setHash(resourcesGlobalHashSupplier.get().getDigest()); + heartbeat.setResourceInfo(resourceInfo); + return heartbeat; } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java index d8668acaa0..4ecd662734 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java @@ -14,13 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.nifi.c2.client.service.operation; +import static java.util.Optional.ofNullable; + +import com.fasterxml.jackson.core.type.TypeReference; import java.util.Map; +import java.util.Optional; import org.apache.nifi.c2.protocol.api.C2Operation; import org.apache.nifi.c2.protocol.api.C2OperationAck; +import org.apache.nifi.c2.protocol.api.C2OperationState; import org.apache.nifi.c2.protocol.api.OperandType; import org.apache.nifi.c2.protocol.api.OperationType; +import org.apache.nifi.c2.serializer.C2Serializer; /** * Handler interface for the different operation types @@ -50,6 +57,7 @@ public interface C2OperationHandler { /** * Determines if the given operation requires to restart the MiNiFi process + * * @return true if it requires restart, false otherwise */ default boolean requiresRestart() { @@ -63,4 +71,60 @@ public interface C2OperationHandler { * @return the result of the operation handling */ C2OperationAck handle(C2Operation operation); + + /** + * Commonly used logic for creating an C2OperationState object + * + * @param operationState the state of the operation + * @param details additional status info to detail the state + * @return the created state + */ + default C2OperationState operationState(C2OperationState.OperationState operationState, String details) { + C2OperationState state = new C2OperationState(); + state.setState(operationState); + state.setDetails(details); + return state; + } + + /** + * Commonly used logic for creating an C2OperationAck object + * + * @param operationId the identifier of the operation + * @param operationState the state of the operation + * @return the created operation ack object + */ + default C2OperationAck operationAck(String operationId, C2OperationState operationState) { + C2OperationAck operationAck = new C2OperationAck(); + operationAck.setOperationState(operationState); + operationAck.setOperationId(operationId); + return operationAck; + } + + /** + * Commonly used logic for retrieving a string value from the operation arguments' map + * + * @param operation the operation with arguments + * @param argument the name of the argument to retrieve + * @return the optional retrieved argument value + */ + default Optional getOperationArg(C2Operation operation, String argument) { + return ofNullable(operation.getArgs()) + .map(args -> args.get(argument)) + .map(arg -> arg instanceof String s ? s : null); + } + + /** + * Commonly used logic for retrieving a JSON object from the operation arguments' map and converting it to a model class + * + * @param operation the operation with arguments + * @param argument the name of the argument to retrieve + * @param type the target type to serialize the object to + * @param serializer the serializer used to converting to the target class + * @return the optional retrieved and converted argument value + */ + default Optional getOperationArg(C2Operation operation, String argument, TypeReference type, C2Serializer serializer) { + return ofNullable(operation.getArgs()) + .map(args -> args.get(argument)) + .flatMap(arg -> serializer.convert(arg, type)); + } } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandler.java index 1045b4acf1..41fc564613 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandler.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandler.java @@ -14,14 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.nifi.c2.client.service.operation; +import static java.util.Optional.ofNullable; import static org.apache.commons.lang3.StringUtils.EMPTY; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED; import static org.apache.nifi.c2.protocol.api.OperandType.MANIFEST; import static org.apache.nifi.c2.protocol.api.OperationType.DESCRIBE; import java.util.Map; -import java.util.Optional; import java.util.function.Supplier; import org.apache.nifi.c2.client.service.C2HeartbeatFactory; import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper; @@ -29,7 +31,6 @@ import org.apache.nifi.c2.protocol.api.AgentInfo; import org.apache.nifi.c2.protocol.api.C2Heartbeat; import org.apache.nifi.c2.protocol.api.C2Operation; import org.apache.nifi.c2.protocol.api.C2OperationAck; -import org.apache.nifi.c2.protocol.api.C2OperationState; import org.apache.nifi.c2.protocol.api.OperandType; import org.apache.nifi.c2.protocol.api.OperationType; @@ -40,7 +41,7 @@ public class DescribeManifestOperationHandler implements C2OperationHandler { private final OperandPropertiesProvider operandPropertiesProvider; public DescribeManifestOperationHandler(C2HeartbeatFactory heartbeatFactory, Supplier runtimeInfoSupplier, - OperandPropertiesProvider operandPropertiesProvider) { + OperandPropertiesProvider operandPropertiesProvider) { this.heartbeatFactory = heartbeatFactory; this.runtimeInfoSupplier = runtimeInfoSupplier; this.operandPropertiesProvider = operandPropertiesProvider; @@ -58,25 +59,24 @@ public class DescribeManifestOperationHandler implements C2OperationHandler { @Override public C2OperationAck handle(C2Operation operation) { - String opIdentifier = Optional.ofNullable(operation.getIdentifier()) - .orElse(EMPTY); - C2OperationAck operationAck = new C2OperationAck(); - C2OperationState state = new C2OperationState(); - operationAck.setOperationState(state); - operationAck.setOperationId(opIdentifier); + String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY); RuntimeInfoWrapper runtimeInfoWrapper = runtimeInfoSupplier.get(); C2Heartbeat heartbeat = heartbeatFactory.create(runtimeInfoWrapper); + C2OperationAck c2OperationAck = operationAck(operationId, operationState(FULLY_APPLIED, EMPTY)); + c2OperationAck.setAgentInfo(agentInfo(heartbeat, runtimeInfoWrapper)); + c2OperationAck.setDeviceInfo(heartbeat.getDeviceInfo()); + c2OperationAck.setFlowInfo(heartbeat.getFlowInfo()); + c2OperationAck.setResourceInfo(heartbeat.getResourceInfo()); + + return c2OperationAck; + } + + private AgentInfo agentInfo(C2Heartbeat heartbeat, RuntimeInfoWrapper runtimeInfoWrapper) { AgentInfo agentInfo = heartbeat.getAgentInfo(); agentInfo.setAgentManifest(runtimeInfoWrapper.getManifest()); - operationAck.setAgentInfo(agentInfo); - operationAck.setDeviceInfo(heartbeat.getDeviceInfo()); - operationAck.setFlowInfo(heartbeat.getFlowInfo()); - - state.setState(C2OperationState.OperationState.FULLY_APPLIED); - - return operationAck; + return agentInfo; } @Override diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/SyncResourceOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/SyncResourceOperationHandler.java new file mode 100644 index 0000000000..839c287a25 --- /dev/null +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/SyncResourceOperationHandler.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.service.operation; + +import static java.util.Optional.ofNullable; +import static org.apache.commons.lang3.StringUtils.EMPTY; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED; +import static org.apache.nifi.c2.protocol.api.OperandType.RESOURCE; +import static org.apache.nifi.c2.protocol.api.OperationType.SYNC; +import static org.apache.nifi.c2.util.Preconditions.requires; + +import com.fasterxml.jackson.core.type.TypeReference; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.nifi.c2.client.api.C2Client; +import org.apache.nifi.c2.protocol.api.C2Operation; +import org.apache.nifi.c2.protocol.api.C2OperationAck; +import org.apache.nifi.c2.protocol.api.C2OperationState; +import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState; +import org.apache.nifi.c2.protocol.api.OperandType; +import org.apache.nifi.c2.protocol.api.OperationType; +import org.apache.nifi.c2.protocol.api.ResourceItem; +import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash; +import org.apache.nifi.c2.serializer.C2Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SyncResourceOperationHandler implements C2OperationHandler { + + static final String GLOBAL_HASH_FIELD = "globalHash"; + static final String RESOURCE_LIST_FIELD = "resourceList"; + + private static final Logger LOG = LoggerFactory.getLogger(SyncResourceOperationHandler.class); + + private final C2Client c2Client; + private final OperandPropertiesProvider operandPropertiesProvider; + private final SyncResourceStrategy syncResourceStrategy; + private final C2Serializer c2Serializer; + + public SyncResourceOperationHandler(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider, SyncResourceStrategy syncResourceStrategy, + C2Serializer c2Serializer) { + this.c2Client = c2Client; + this.operandPropertiesProvider = operandPropertiesProvider; + this.syncResourceStrategy = syncResourceStrategy; + this.c2Serializer = c2Serializer; + } + + public static SyncResourceOperationHandler create(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider, SyncResourceStrategy syncResourceStrategy, + C2Serializer c2Serializer) { + requires(c2Client != null, "C2Client should not be null"); + requires(operandPropertiesProvider != null, "OperandPropertiesProvider should not be not null"); + requires(syncResourceStrategy != null, "Sync resource strategy should not be null"); + requires(c2Serializer != null, "C2 serializer should not be null"); + return new SyncResourceOperationHandler(c2Client, operandPropertiesProvider, syncResourceStrategy, c2Serializer); + } + + @Override + public OperationType getOperationType() { + return SYNC; + } + + @Override + public OperandType getOperandType() { + return RESOURCE; + } + + @Override + public Map getProperties() { + return operandPropertiesProvider.getProperties(); + } + + @Override + public C2OperationAck handle(C2Operation operation) { + String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY); + + Optional resourcesGlobalHash = getOperationArg(operation, GLOBAL_HASH_FIELD, new TypeReference<>() { + }, c2Serializer); + if (resourcesGlobalHash.isEmpty()) { + LOG.error("Resources global hash could not be constructed from C2 request"); + return operationAck(operationId, operationState(NOT_APPLIED, "Resources global hash element was not found")); + } + Optional> resourceItems = getOperationArg(operation, RESOURCE_LIST_FIELD, new TypeReference<>() { + }, c2Serializer); + if (resourceItems.isEmpty()) { + LOG.error("Resource item list could not be constructed from C2 request"); + return operationAck(operationId, operationState(NOT_APPLIED, "Resource item list element was not found")); + } + + OperationState operationState = syncResourceStrategy.synchronizeResourceRepository(resourcesGlobalHash.get(), resourceItems.get(), c2Client::retrieveResourceItem, + relativeUrl -> c2Client.getCallbackUrl(null, relativeUrl)); + C2OperationState resultState = operationState( + operationState, + switch (operationState) { + case NOT_APPLIED -> "No resource items were retrieved, please check the log for errors"; + case PARTIALLY_APPLIED -> "Resource repository is partially synced, retrieving some items failed. Pleas check log for errors"; + case FULLY_APPLIED -> "Agent Resource repository is in sync with the C2 server"; + default -> "Unexpected status, please check the log for errors"; + } + ); + + return operationAck(operationId, resultState); + } +} diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/SyncResourceStrategy.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/SyncResourceStrategy.java new file mode 100644 index 0000000000..213614204a --- /dev/null +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/SyncResourceStrategy.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.service.operation; + +import java.io.InputStream; +import java.nio.file.Path; +import java.util.List; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState; +import org.apache.nifi.c2.protocol.api.ResourceItem; +import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash; + +public interface SyncResourceStrategy { + + OperationState synchronizeResourceRepository(ResourcesGlobalHash resourcesGlobalHash, List c2ServerItems, + BiFunction>, Optional> resourceDownloadFunction, + Function> urlEnrichFunction); +} diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandler.java index e2b045bfc9..083464d578 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandler.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandler.java @@ -21,7 +21,6 @@ import static java.nio.file.Files.copy; import static java.nio.file.Files.createTempDirectory; import static java.nio.file.Files.lines; import static java.nio.file.Files.walk; -import static java.util.Collections.emptyMap; import static java.util.Optional.empty; import static java.util.Optional.ofNullable; import static org.apache.commons.io.IOUtils.closeQuietly; @@ -30,6 +29,7 @@ import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FU import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED; import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG; import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER; +import static org.apache.nifi.c2.util.Preconditions.requires; import java.io.ByteArrayOutputStream; import java.io.File; @@ -55,7 +55,6 @@ import org.apache.nifi.c2.client.api.C2Client; import org.apache.nifi.c2.protocol.api.C2Operation; import org.apache.nifi.c2.protocol.api.C2OperationAck; import org.apache.nifi.c2.protocol.api.C2OperationState; -import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState; import org.apache.nifi.c2.protocol.api.OperandType; import org.apache.nifi.c2.protocol.api.OperationType; import org.slf4j.Logger; @@ -88,18 +87,10 @@ public class TransferDebugOperationHandler implements C2OperationHandler { public static TransferDebugOperationHandler create(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider, List bundleFilePaths, Predicate contentFilter) { - if (c2Client == null) { - throw new IllegalArgumentException("C2Client should not be null"); - } - if (operandPropertiesProvider == null) { - throw new IllegalArgumentException("OperandPropertiesProvider should not be not null"); - } - if (bundleFilePaths == null || bundleFilePaths.isEmpty()) { - throw new IllegalArgumentException("bundleFilePaths should not be not null or empty"); - } - if (contentFilter == null) { - throw new IllegalArgumentException("Content filter should not be null"); - } + requires(c2Client != null, "C2Client should not be null"); + requires(operandPropertiesProvider != null, "OperandPropertiesProvider should not be not null"); + requires(bundleFilePaths != null && !bundleFilePaths.isEmpty(), "BundleFilePaths should not be not null or empty"); + requires(contentFilter != null, "Content filter should not be null"); return new TransferDebugOperationHandler(c2Client, operandPropertiesProvider, bundleFilePaths, contentFilter); } @@ -120,17 +111,18 @@ public class TransferDebugOperationHandler implements C2OperationHandler { @Override public C2OperationAck handle(C2Operation operation) { - Map arguments = ofNullable(operation.getArgs()).orElse(emptyMap()); - Optional callbackUrl = c2Client.getCallbackUrl(arguments.get(TARGET_ARG), arguments.get(RELATIVE_TARGET_ARG)); - if (!callbackUrl.isPresent()) { + String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY); + + Optional callbackUrl = c2Client.getCallbackUrl(getOperationArg(operation, TARGET_ARG).orElse(EMPTY), getOperationArg(operation, RELATIVE_TARGET_ARG).orElse(EMPTY)); + if (callbackUrl.isEmpty()) { LOG.error("Callback URL could not be constructed from C2 request and current configuration"); - return operationAck(operation, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND)); + return operationAck(operationId, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND)); } List preparedFiles = null; C2OperationState operationState; try { - preparedFiles = prepareFiles(operation.getIdentifier(), bundleFilePaths); + preparedFiles = prepareFiles(operationId, bundleFilePaths); operationState = createDebugBundle(preparedFiles) .map(bundle -> c2Client.uploadBundle(callbackUrl.get(), bundle) .map(errorMessage -> operationState(NOT_APPLIED, errorMessage)) @@ -144,21 +136,7 @@ public class TransferDebugOperationHandler implements C2OperationHandler { } LOG.debug("Returning operation ack for operation {} with state {} and details {}", operation.getIdentifier(), operationState.getState(), operationState.getDetails()); - return operationAck(operation, operationState); - } - - private C2OperationAck operationAck(C2Operation operation, C2OperationState state) { - C2OperationAck operationAck = new C2OperationAck(); - operationAck.setOperationId(ofNullable(operation.getIdentifier()).orElse(EMPTY)); - operationAck.setOperationState(state); - return operationAck; - } - - private C2OperationState operationState(OperationState operationState, String details) { - C2OperationState state = new C2OperationState(); - state.setState(operationState); - state.setDetails(details); - return state; + return operationAck(operationId, operationState); } private List prepareFiles(String operationId, List bundleFilePaths) throws IOException { diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandler.java index eb775c3da0..76847a58c1 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandler.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandler.java @@ -17,8 +17,7 @@ package org.apache.nifi.c2.client.service.operation; -import static java.lang.Boolean.parseBoolean; -import static java.util.Collections.emptyMap; +import static java.lang.Boolean.FALSE; import static java.util.Optional.ofNullable; import static org.apache.commons.lang3.StringUtils.EMPTY; import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED; @@ -26,6 +25,7 @@ import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION; import static org.apache.nifi.c2.protocol.api.OperandType.ASSET; import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE; +import static org.apache.nifi.c2.util.Preconditions.requires; import java.util.Map; import java.util.Optional; @@ -35,7 +35,6 @@ import org.apache.nifi.c2.client.api.C2Client; import org.apache.nifi.c2.protocol.api.C2Operation; import org.apache.nifi.c2.protocol.api.C2OperationAck; import org.apache.nifi.c2.protocol.api.C2OperationState; -import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState; import org.apache.nifi.c2.protocol.api.OperandType; import org.apache.nifi.c2.protocol.api.OperationType; import org.slf4j.Logger; @@ -72,19 +71,10 @@ public class UpdateAssetOperationHandler implements C2OperationHandler { public static UpdateAssetOperationHandler create(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider, BiPredicate assetUpdatePrecondition, BiFunction assetPersistFunction) { - if (c2Client == null) { - throw new IllegalArgumentException("C2Client should not be null"); - } - if (operandPropertiesProvider == null) { - throw new IllegalArgumentException("OperandPropertiesProvider should not be not null"); - } - if (assetUpdatePrecondition == null) { - throw new IllegalArgumentException("Asset update precondition should not be null"); - } - if (assetPersistFunction == null) { - throw new IllegalArgumentException("Asset persist function should not be null"); - } - + requires(c2Client != null, "C2Client should not be null"); + requires(operandPropertiesProvider != null, "OperandPropertiesProvider should not be not null"); + requires(assetUpdatePrecondition != null, "Asset update precondition should not be null"); + requires(assetPersistFunction != null, "Asset persist function should not be null"); return new UpdateAssetOperationHandler(c2Client, operandPropertiesProvider, assetUpdatePrecondition, assetPersistFunction); } @@ -107,47 +97,30 @@ public class UpdateAssetOperationHandler implements C2OperationHandler { public C2OperationAck handle(C2Operation operation) { String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY); - Optional callbackUrl = c2Client.getCallbackUrl(getOperationArg(operation, ASSET_URL_KEY), getOperationArg(operation, ASSET_RELATIVE_URL_KEY)); - if (!callbackUrl.isPresent()) { + Optional callbackUrl = + c2Client.getCallbackUrl(getOperationArg(operation, ASSET_URL_KEY).orElse(EMPTY), getOperationArg(operation, ASSET_RELATIVE_URL_KEY).orElse(EMPTY)); + if (callbackUrl.isEmpty()) { LOG.error("Callback URL could not be constructed from C2 request and current configuration"); return operationAck(operationId, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND)); } - String assetFileName = getOperationArg(operation, ASSET_FILE_KEY); - if (assetFileName == null) { + Optional assetFileName = getOperationArg(operation, ASSET_FILE_KEY); + if (assetFileName.isEmpty()) { LOG.error("Asset file name with key={} was not found in C2 request. C2 request arguments={}", ASSET_FILE_KEY, operation.getArgs()); return operationAck(operationId, operationState(NOT_APPLIED, ASSET_FILE_NAME_NOT_FOUND)); } - boolean forceDownload = parseBoolean(getOperationArg(operation, ASSET_FORCE_DOWNLOAD_KEY)); + boolean forceDownload = getOperationArg(operation, ASSET_FORCE_DOWNLOAD_KEY).map(Boolean::parseBoolean).orElse(FALSE); LOG.info("Initiating asset update from url {} with name {}, force update is {}", callbackUrl, assetFileName, forceDownload); - C2OperationState operationState = assetUpdatePrecondition.test(assetFileName, forceDownload) + C2OperationState operationState = assetUpdatePrecondition.test(assetFileName.get(), forceDownload) ? c2Client.retrieveUpdateAssetContent(callbackUrl.get()) - .map(content -> assetPersistFunction.apply(assetFileName, content) - ? operationState(FULLY_APPLIED, SUCCESSFULLY_UPDATE_ASSET) - : operationState(NOT_APPLIED, FAILED_TO_PERSIST_ASSET_TO_DISK)) - .orElseGet(() -> operationState(NOT_APPLIED, UPDATE_ASSET_RETRIEVAL_RESULTED_IN_EMPTY_CONTENT)) + .map(content -> assetPersistFunction.apply(assetFileName.get(), content) + ? operationState(FULLY_APPLIED, SUCCESSFULLY_UPDATE_ASSET) + : operationState(NOT_APPLIED, FAILED_TO_PERSIST_ASSET_TO_DISK)) + .orElseGet(() -> operationState(NOT_APPLIED, UPDATE_ASSET_RETRIEVAL_RESULTED_IN_EMPTY_CONTENT)) : operationState(NO_OPERATION, UPDATE_ASSET_PRECONDITIONS_WERE_NOT_MET); return operationAck(operationId, operationState); } - - private String getOperationArg(C2Operation operation, String argument) { - return ofNullable(operation.getArgs()).orElse(emptyMap()).get(argument); - } - - private C2OperationState operationState(OperationState operationState, String details) { - C2OperationState state = new C2OperationState(); - state.setState(operationState); - state.setDetails(details); - return state; - } - - private C2OperationAck operationAck(String operationId, C2OperationState operationState) { - C2OperationAck operationAck = new C2OperationAck(); - operationAck.setOperationState(operationState); - operationAck.setOperationId(operationId); - return operationAck; - } } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java index d2f4c7e9e1..ecc31920a4 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java @@ -17,7 +17,7 @@ package org.apache.nifi.c2.client.service.operation; -import static java.util.Collections.emptyMap; +import static java.util.Optional.empty; import static java.util.Optional.ofNullable; import static org.apache.commons.lang3.StringUtils.EMPTY; import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED; @@ -88,32 +88,31 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler { @Override public C2OperationAck handle(C2Operation operation) { - String operationId = Optional.ofNullable(operation.getIdentifier()).orElse(EMPTY); + String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY); - Map arguments = ofNullable(operation.getArgs()).orElse(emptyMap()); - String absoluteFlowUrl = ofNullable(arguments.get(FLOW_URL_KEY)).orElse(arguments.get(LOCATION)); - Optional callbackUrl = client.getCallbackUrl(absoluteFlowUrl, arguments.get(FLOW_RELATIVE_URL_KEY)); - if (!callbackUrl.isPresent()) { + String absoluteFlowUrl = getOperationArg(operation, FLOW_URL_KEY).orElse(getOperationArg(operation, LOCATION).orElse(EMPTY)); + Optional callbackUrl = client.getCallbackUrl(absoluteFlowUrl, getOperationArg(operation, FLOW_RELATIVE_URL_KEY).orElse(EMPTY)); + if (callbackUrl.isEmpty()) { logger.error("Callback URL could not be constructed from C2 request and current configuration"); return operationAck(operationId, operationState(NOT_APPLIED, "Could not get callback url from operation and current configuration")); } - String flowId = getFlowId(operation.getArgs(), callbackUrl.get()); - if (flowId == null) { + Optional flowId = getFlowId(operation, callbackUrl.get()); + if (flowId.isEmpty()) { logger.error("FlowId is missing, no update will be performed"); return operationAck(operationId, operationState(NOT_APPLIED, "Could not get flowId from the operation")); } - if (flowIdHolder.getFlowId() != null && flowIdHolder.getFlowId().equals(flowId)) { + if (flowIdHolder.getFlowId() != null && flowIdHolder.getFlowId().equals(flowId.get())) { logger.info("Flow is current, no update is necessary"); return operationAck(operationId, operationState(NO_OPERATION, "Flow is current, no update is necessary")); } logger.info("Will perform flow update from {} for operation #{}. Previous flow id was {}, replacing with new id {}", - callbackUrl, operationId, ofNullable(flowIdHolder.getFlowId()).orElse("not set"), flowId); + callbackUrl, operationId, ofNullable(flowIdHolder.getFlowId()).orElse("not set"), flowId.get()); C2OperationState state = updateFlow(operationId, callbackUrl.get()); if (state.getState() == FULLY_APPLIED) { - flowIdHolder.setFlowId(flowId); + flowIdHolder.setFlowId(flowId.get()); } return operationAck(operationId, state); } @@ -121,7 +120,7 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler { private C2OperationState updateFlow(String opIdentifier, String callbackUrl) { Optional updateContent = client.retrieveUpdateConfigurationContent(callbackUrl); - if (!updateContent.isPresent()) { + if (updateContent.isEmpty()) { logger.error("Update content retrieval resulted in empty content so flow update was omitted for operation #{}.", opIdentifier); return operationState(NOT_APPLIED, "Update content retrieval resulted in empty content"); } @@ -135,37 +134,21 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler { return operationState(FULLY_APPLIED, "Update configuration applied successfully"); } - private String getFlowId(Map args, String callbackUrl) { - return Optional.ofNullable(args) - .map(map -> map.get(FLOW_ID)) - .orElseGet(() -> parseFlowId(callbackUrl)); + private Optional getFlowId(C2Operation operation, String callbackUrl) { + return getOperationArg(operation, FLOW_ID).or(() -> parseFlowId(callbackUrl)); } - private String parseFlowId(String callbackUrl) { + private Optional parseFlowId(String callbackUrl) { try { URI flowUri = new URI(callbackUrl); Matcher matcher = FLOW_ID_PATTERN.matcher(flowUri.getPath()); if (matcher.matches()) { - return matcher.group(1); + return ofNullable(matcher.group(1)); } } catch (Exception e) { logger.error("Could not get flow id from the provided URL, flow update URL format unexpected [{}]", callbackUrl); } - return null; - } - - private C2OperationState operationState(C2OperationState.OperationState operationState, String details) { - C2OperationState state = new C2OperationState(); - state.setState(operationState); - state.setDetails(details); - return state; - } - - private C2OperationAck operationAck(String operationId, C2OperationState operationState) { - C2OperationAck operationAck = new C2OperationAck(); - operationAck.setOperationState(operationState); - operationAck.setOperationId(operationId); - return operationAck; + return empty(); } } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandler.java index 2a9ac64cc2..af8fe0343a 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandler.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandler.java @@ -17,6 +17,8 @@ package org.apache.nifi.c2.client.service.operation; +import static java.util.Optional.ofNullable; +import static org.apache.commons.lang3.StringUtils.EMPTY; import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED; import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED; import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION; @@ -38,9 +40,9 @@ public class UpdatePropertiesOperationHandler implements C2OperationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(UpdatePropertiesOperationHandler.class); private final OperandPropertiesProvider operandPropertiesProvider; - private final Function, Boolean> persistProperties; + private final Function, Boolean> persistProperties; - public UpdatePropertiesOperationHandler(OperandPropertiesProvider operandPropertiesProvider, Function, Boolean> persistProperties) { + public UpdatePropertiesOperationHandler(OperandPropertiesProvider operandPropertiesProvider, Function, Boolean> persistProperties) { this.operandPropertiesProvider = operandPropertiesProvider; this.persistProperties = persistProperties; } @@ -62,27 +64,24 @@ public class UpdatePropertiesOperationHandler implements C2OperationHandler { @Override public C2OperationAck handle(C2Operation operation) { - C2OperationAck c2OperationAck = new C2OperationAck(); - c2OperationAck.setOperationId(operation.getIdentifier()); - C2OperationState operationState = new C2OperationState(); - c2OperationAck.setOperationState(operationState); + String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY); + + C2OperationState c2OperationState; try { if (persistProperties.apply(operation.getArgs())) { - operationState.setState(FULLY_APPLIED); + c2OperationState = operationState(FULLY_APPLIED, null); } else { LOGGER.info("Properties are already in desired state"); - operationState.setState(NO_OPERATION); + c2OperationState = operationState(NO_OPERATION, null); } } catch (IllegalArgumentException e) { - LOGGER.error(e.getMessage()); - operationState.setState(NOT_APPLIED); - operationState.setDetails(e.getMessage()); + LOGGER.error("Operation not applied due to issues with the arguments: {}", e.getMessage()); + c2OperationState = operationState(NOT_APPLIED, e.getMessage()); } catch (Exception e) { - LOGGER.error("Exception happened during persisting properties", e); - operationState.setState(NOT_APPLIED); - operationState.setDetails("Failed to persist properties"); + LOGGER.error("Unexpected error happened during persisting properties", e); + c2OperationState = operationState(NOT_APPLIED, "Failed to persist properties"); } - return c2OperationAck; + return operationAck(operationId, c2OperationState); } @Override diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java index ccc8a01e47..4fb20aeb86 100644 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import org.apache.nifi.c2.client.C2ClientConfig; import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper; import org.apache.nifi.c2.protocol.api.AgentManifest; @@ -37,6 +38,7 @@ import org.apache.nifi.c2.protocol.api.C2Heartbeat; import org.apache.nifi.c2.protocol.api.FlowQueueStatus; import org.apache.nifi.c2.protocol.api.OperationType; import org.apache.nifi.c2.protocol.api.SupportedOperation; +import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash; import org.apache.nifi.c2.protocol.component.api.Bundle; import org.apache.nifi.c2.protocol.component.api.RuntimeManifest; import org.junit.jupiter.api.BeforeEach; @@ -53,6 +55,7 @@ public class C2HeartbeatFactoryTest { private static final String AGENT_CLASS = "agentClass"; private static final String FLOW_ID = "flowId"; private static final String MANIFEST_HASH = "hash"; + private static final String RESOURCE_HASH = "resourceHash"; @Mock private C2ClientConfig clientConfig; @@ -66,6 +69,9 @@ public class C2HeartbeatFactoryTest { @Mock private ManifestHashProvider manifestHashProvider; + @Mock + private Supplier resourcesGlobalHashSupplier; + @InjectMocks private C2HeartbeatFactory c2HeartbeatFactory; @@ -82,26 +88,31 @@ public class C2HeartbeatFactoryTest { when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID); when(clientConfig.getAgentClass()).thenReturn(AGENT_CLASS); when(runtimeInfoWrapper.getManifest()).thenReturn(createManifest()); + when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash()); C2Heartbeat heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper); assertEquals(FLOW_ID, heartbeat.getFlowId()); assertEquals(AGENT_CLASS, heartbeat.getAgentClass()); + assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash()); } @Test void testCreateGeneratesAgentAndDeviceIdIfNotPresent() { when(runtimeInfoWrapper.getManifest()).thenReturn(createManifest()); + when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash()); C2Heartbeat heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper); assertNotNull(heartbeat.getAgentId()); assertNotNull(heartbeat.getDeviceId()); + assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash()); } @Test void testCreatePopulatesFromRuntimeInfoWrapperForFullHeartbeat() { when(clientConfig.isFullHeartbeat()).thenReturn(true); + when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash()); AgentRepositories repos = new AgentRepositories(); RuntimeManifest manifest = createManifest(); @@ -112,11 +123,13 @@ public class C2HeartbeatFactoryTest { assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories()); assertEquals(manifest, heartbeat.getAgentInfo().getAgentManifest()); assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues()); + assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash()); } @Test void testCreatePopulatesFromRuntimeInfoWrapperForLightHeartbeat() { when(clientConfig.isFullHeartbeat()).thenReturn(false); + when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash()); AgentRepositories repos = new AgentRepositories(); RuntimeManifest manifest = createManifest(); @@ -127,6 +140,7 @@ public class C2HeartbeatFactoryTest { assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories()); assertNull(heartbeat.getAgentInfo().getAgentManifest()); assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues()); + assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash()); } @Test @@ -140,10 +154,12 @@ public class C2HeartbeatFactoryTest { void testAgentManifestHashIsPopulatedInCaseOfRuntimeManifest() { RuntimeManifest manifest = createManifest(); when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), Collections.emptySet())).thenReturn(MANIFEST_HASH); + when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash()); C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>())); assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash()); + assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash()); } @Test @@ -154,10 +170,12 @@ public class C2HeartbeatFactoryTest { Set supportedOperations = Collections.singleton(supportedOperation); manifest.setSupportedOperations(supportedOperations); when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), supportedOperations)).thenReturn(MANIFEST_HASH); + when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash()); C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>())); assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash()); + assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash()); } private RuntimeManifest createManifest() { @@ -167,7 +185,12 @@ public class C2HeartbeatFactoryTest { private RuntimeManifest createManifest(Bundle... bundles) { RuntimeManifest manifest = new RuntimeManifest(); manifest.setBundles(Arrays.asList(bundles)); - return manifest; } + + private ResourcesGlobalHash createResourcesGlobalHash() { + ResourcesGlobalHash resourcesGlobalHash = new ResourcesGlobalHash(); + resourcesGlobalHash.setDigest(RESOURCE_HASH); + return resourcesGlobalHash; + } } diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/SyncResourceOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/SyncResourceOperationHandlerTest.java new file mode 100644 index 0000000000..22c8bc68f6 --- /dev/null +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/SyncResourceOperationHandlerTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.service.operation; + +import static java.util.Optional.empty; +import static java.util.Optional.ofNullable; +import static org.apache.nifi.c2.client.service.operation.SyncResourceOperationHandler.GLOBAL_HASH_FIELD; +import static org.apache.nifi.c2.client.service.operation.SyncResourceOperationHandler.RESOURCE_LIST_FIELD; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED; +import static org.apache.nifi.c2.protocol.api.OperandType.RESOURCE; +import static org.apache.nifi.c2.protocol.api.OperationType.SYNC; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.nifi.c2.client.api.C2Client; +import org.apache.nifi.c2.protocol.api.C2Operation; +import org.apache.nifi.c2.protocol.api.C2OperationAck; +import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState; +import org.apache.nifi.c2.protocol.api.ResourceItem; +import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash; +import org.apache.nifi.c2.serializer.C2Serializer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class SyncResourceOperationHandlerTest { + + private static final String OPERATION_ID = "operationId"; + + @Mock + private C2Client mockC2Client; + + @Mock + private OperandPropertiesProvider mockOperandPropertiesProvider; + + @Mock + private SyncResourceStrategy mockSyncResourceStrategy; + + @Mock + private C2Serializer mockC2Serializer; + + @InjectMocks + private SyncResourceOperationHandler testHandler; + + @ParameterizedTest(name = "c2Client={0} operandPropertiesProvider={1} syncResourceStrategy={2} c2Serializer={3}") + @MethodSource("invalidConstructorArguments") + public void testAttemptingCreateWithInvalidParametersWillThrowException(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider, + SyncResourceStrategy syncResourceStrategy, C2Serializer c2Serializer) { + assertThrows(IllegalArgumentException.class, () -> SyncResourceOperationHandler.create(c2Client, operandPropertiesProvider, syncResourceStrategy, c2Serializer)); + } + + @Test + public void testOperationAndOperandTypesAreMatching() { + assertEquals(SYNC, testHandler.getOperationType()); + assertEquals(RESOURCE, testHandler.getOperandType()); + } + + @Test + public void testResourcesGlobalHashArgumentIsNull() { + C2Operation inputOperation = operation(null, List.of(new ResourceItem())); + + C2OperationAck c2OperationAck = testHandler.handle(inputOperation); + + assertEquals(OPERATION_ID, c2OperationAck.getOperationId()); + assertEquals(NOT_APPLIED, c2OperationAck.getOperationState().getState()); + verify(mockSyncResourceStrategy, never()).synchronizeResourceRepository(any(), anyList(), any(), any()); + } + + @Test + public void testResourceListArgumentIsNull() { + ResourcesGlobalHash resourcesGlobalHash = new ResourcesGlobalHash(); + C2Operation inputOperation = operation(resourcesGlobalHash, null); + when(mockC2Serializer.convert(eq(resourcesGlobalHash), any())).thenReturn(ofNullable(resourcesGlobalHash)); + + C2OperationAck c2OperationAck = testHandler.handle(inputOperation); + + assertEquals(OPERATION_ID, c2OperationAck.getOperationId()); + assertEquals(NOT_APPLIED, c2OperationAck.getOperationState().getState()); + verify(mockSyncResourceStrategy, never()).synchronizeResourceRepository(any(), anyList(), any(), any()); + } + + @Test + public void testArgumentConversionFailure() { + ResourcesGlobalHash resourcesGlobalHash = new ResourcesGlobalHash(); + List resourceItems = List.of(new ResourceItem()); + C2Operation inputOperation = operation(resourcesGlobalHash, resourceItems); + when(mockC2Serializer.convert(eq(resourcesGlobalHash), any())).thenReturn(empty()); + + C2OperationAck c2OperationAck = testHandler.handle(inputOperation); + + assertEquals(OPERATION_ID, c2OperationAck.getOperationId()); + assertEquals(NOT_APPLIED, c2OperationAck.getOperationState().getState()); + verify(mockSyncResourceStrategy, never()).synchronizeResourceRepository(any(), anyList(), any(), any()); + } + + @ParameterizedTest(name = "operationState={0}") + @MethodSource("synchronizeStrategyArguments") + public void testSynchronizeResourceStrategyExecutions(OperationState operationState) { + ResourcesGlobalHash resourcesGlobalHash = new ResourcesGlobalHash(); + List resourceItems = List.of(new ResourceItem()); + C2Operation inputOperation = operation(resourcesGlobalHash, resourceItems); + when(mockC2Serializer.convert(eq(resourcesGlobalHash), any())).thenReturn(Optional.of(resourcesGlobalHash)); + when(mockC2Serializer.convert(eq(resourceItems), any())).thenReturn(Optional.of(resourceItems)); + when(mockSyncResourceStrategy.synchronizeResourceRepository(eq(resourcesGlobalHash), eq(resourceItems), any(), any())).thenReturn(operationState); + + C2OperationAck c2OperationAck = testHandler.handle(inputOperation); + + assertEquals(OPERATION_ID, c2OperationAck.getOperationId()); + assertEquals(operationState, c2OperationAck.getOperationState().getState()); + } + + private static Stream invalidConstructorArguments() { + return Stream.of( + Arguments.of(null, null, null, null), + Arguments.of(mock(C2Client.class), null, null, null), + Arguments.of(mock(C2Client.class), mock(OperandPropertiesProvider.class), null, null), + Arguments.of(mock(C2Client.class), mock(OperandPropertiesProvider.class), mock(SyncResourceStrategy.class), null), + Arguments.of(mock(C2Client.class), mock(OperandPropertiesProvider.class), null, mock(C2Serializer.class))); + } + + private static Stream synchronizeStrategyArguments() { + return Stream.of(OperationState.values()).map(Arguments::of); + } + + private C2Operation operation(ResourcesGlobalHash resourcesGlobalHash, List resourceItems) { + C2Operation c2Operation = new C2Operation(); + c2Operation.setIdentifier(OPERATION_ID); + Map arguments = new HashMap<>(); + arguments.put(GLOBAL_HASH_FIELD, resourcesGlobalHash); + arguments.put(RESOURCE_LIST_FIELD, resourceItems); + c2Operation.setArgs(arguments); + return c2Operation; + } +} diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandlerTest.java index a078452fe8..4a171dc8f1 100644 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandlerTest.java +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandlerTest.java @@ -211,7 +211,7 @@ public class UpdateAssetOperationHandlerTest { C2Operation c2Operation = new C2Operation(); c2Operation.setIdentifier(OPERATION_ID); - Map arguments = new HashMap<>(); + Map arguments = new HashMap<>(); arguments.put(ASSET_URL_KEY, assetUrl); arguments.put(ASSET_FILE_KEY, assetFile); arguments.put(ASSET_FORCE_DOWNLOAD_KEY, forceDownload); diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java index d12b1be5f5..d67d56cdc8 100644 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java @@ -48,8 +48,8 @@ public class UpdateConfigurationOperationHandlerTest { private static final String CORRECT_LOCATION = "/path/for/the/" + FLOW_ID; private static final String INCORRECT_LOCATION = "incorrect/location"; - private static final Map CORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, CORRECT_LOCATION); - private static final Map INCORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, INCORRECT_LOCATION); + private static final Map CORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, CORRECT_LOCATION); + private static final Map INCORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, INCORRECT_LOCATION); @Mock private FlowIdHolder flowIdHolder; @@ -89,7 +89,7 @@ public class UpdateConfigurationOperationHandlerTest { C2Operation operation = new C2Operation(); operation.setIdentifier(OPERATION_ID); - Map args = new HashMap<>(); + Map args = new HashMap<>(); args.putAll(INCORRECT_LOCATION_MAP); args.put(FLOW_ID, "argsFlowId"); operation.setArgs(args); diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandlerTest.java index b060143dc4..8c34cb4535 100644 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandlerTest.java +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandlerTest.java @@ -41,13 +41,13 @@ import org.mockito.junit.jupiter.MockitoExtension; public class UpdatePropertiesOperationHandlerTest { private static final String ID = "id"; - private static final Map ARGS = Collections.singletonMap("key", "value"); + private static final Map ARGS = Collections.singletonMap("key", "value"); @Mock private OperandPropertiesProvider operandPropertiesProvider; @Mock - private Function, Boolean> persistProperties; + private Function, Boolean> persistProperties; @InjectMocks private UpdatePropertiesOperationHandler updatePropertiesOperationHandler; diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2Heartbeat.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2Heartbeat.java index e09760fe64..b2eb076448 100644 --- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2Heartbeat.java +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2Heartbeat.java @@ -34,6 +34,7 @@ public class C2Heartbeat implements Serializable { private DeviceInfo deviceInfo; private AgentInfo agentInfo; private FlowInfo flowInfo; + private ResourceInfo resourceInfo; @Schema(hidden = true) public String getIdentifier() { @@ -80,6 +81,15 @@ public class C2Heartbeat implements Serializable { this.flowInfo = flowInfo; } + @Schema(description = "Metadata for the resources currently deployed to the agent") + public ResourceInfo getResourceInfo() { + return resourceInfo; + } + + public void setResourceInfo(ResourceInfo resourceInfo) { + this.resourceInfo = resourceInfo; + } + // Convenience getters @Schema(hidden = true) public String getDeviceId() { @@ -139,5 +149,4 @@ public class C2Heartbeat implements Serializable { public int hashCode() { return Objects.hash(identifier); } - } diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2Operation.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2Operation.java index 6dca210258..7d79e1fdb9 100644 --- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2Operation.java +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2Operation.java @@ -32,7 +32,7 @@ public class C2Operation implements Serializable { private String identifier; private OperationType operation; private OperandType operand; - private Map args; + private Map args; private Set dependencies; @Schema(description = "A unique identifier for the operation", accessMode = Schema.AccessMode.READ_ONLY) @@ -79,11 +79,11 @@ public class C2Operation implements Serializable { "The syntax and semantics of these arguments is defined per operation in" + "the C2 protocol and possibly extended by an agent's implementation of the" + "C2 protocol.") - public Map getArgs() { + public Map getArgs() { return args; } - public void setArgs(Map args) { + public void setArgs(Map args) { this.args = args; } diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationAck.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationAck.java index 08729e1b1f..c5975183a5 100644 --- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationAck.java +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationAck.java @@ -18,7 +18,6 @@ package org.apache.nifi.c2.protocol.api; import io.swagger.v3.oas.annotations.media.Schema; - import java.io.Serializable; import java.util.Objects; @@ -41,6 +40,8 @@ public class C2OperationAck implements Serializable { @Schema(description = "Optionally, an ack can include flow info that is relevant to the operation being acknowledged") private FlowInfo flowInfo; + @Schema(description = "Optionally, an ack can include resource info that is relevant to the operation being acknowledged") + private ResourceInfo resourceInfo; public String getOperationId() { return operationId; @@ -82,6 +83,14 @@ public class C2OperationAck implements Serializable { this.flowInfo = flowInfo; } + public ResourceInfo getResourceInfo() { + return resourceInfo; + } + + public void setResourceInfo(ResourceInfo resourceInfo) { + this.resourceInfo = resourceInfo; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -91,13 +100,14 @@ public class C2OperationAck implements Serializable { return false; } C2OperationAck that = (C2OperationAck) o; - return Objects.equals(operationId, that.operationId) && Objects.equals(operationState, that.operationState) && Objects.equals(deviceInfo, - that.deviceInfo) && Objects.equals(agentInfo, that.agentInfo) && Objects.equals(flowInfo, that.flowInfo); + return Objects.equals(operationId, that.operationId) && Objects.equals(operationState, that.operationState) + && Objects.equals(deviceInfo, that.deviceInfo) && Objects.equals(agentInfo, that.agentInfo) && Objects.equals(flowInfo, that.flowInfo) + && Objects.equals(resourceInfo, that.resourceInfo); } @Override public int hashCode() { - return Objects.hash(operationId, operationState, deviceInfo, agentInfo, flowInfo); + return Objects.hash(operationId, operationState, deviceInfo, agentInfo, flowInfo, resourceInfo); } @Override @@ -108,6 +118,7 @@ public class C2OperationAck implements Serializable { ", deviceInfo=" + deviceInfo + ", agentInfo=" + agentInfo + ", flowInfo=" + flowInfo + + ", resourceInfo=" + resourceInfo + '}'; } } diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java index bc9538d9c3..f2fcff3f27 100644 --- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java @@ -28,7 +28,8 @@ public enum OperandType { MANIFEST, REPOSITORY, PROPERTIES, - ASSET; + ASSET, + RESOURCE; public static Optional fromString(String value) { return Arrays.stream(values()) diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java index 766bc6d303..e948e31f64 100644 --- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java @@ -23,6 +23,7 @@ import static org.apache.nifi.c2.protocol.api.OperandType.CONNECTION; import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG; import static org.apache.nifi.c2.protocol.api.OperandType.MANIFEST; import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES; +import static org.apache.nifi.c2.protocol.api.OperandType.RESOURCE; import java.util.Arrays; import java.util.Set; @@ -44,6 +45,7 @@ public enum OperationType { PAUSE, REPLICATE, SUBSCRIBE, + SYNC(RESOURCE), TRANSFER(DEBUG); private final Set supportedOperands; diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ResourceInfo.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ResourceInfo.java new file mode 100644 index 0000000000..7affc3607e --- /dev/null +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ResourceInfo.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.protocol.api; + +import io.swagger.v3.oas.annotations.media.Schema; +import java.io.Serializable; +import java.util.Objects; + +public class ResourceInfo implements Serializable { + + private static final long serialVersionUID = 7576080726533697542L; + + private String hash; + + @Schema(description = "A global hash calculated from all the available resources on the agent", required = true) + public String getHash() { + return hash; + } + + public void setHash(String hash) { + this.hash = hash; + } + + @Override + public String toString() { + return "ResourceInfo{" + + "hash='" + hash + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ResourceInfo resourceInfo = (ResourceInfo) o; + return Objects.equals(hash, resourceInfo.hash); + } + + @Override + public int hashCode() { + return Objects.hash(hash); + } +} diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ResourceItem.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ResourceItem.java new file mode 100644 index 0000000000..b1d2bcd6fa --- /dev/null +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ResourceItem.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.protocol.api; + +import io.swagger.v3.oas.annotations.media.Schema; +import java.io.Serializable; +import java.util.Objects; + +public class ResourceItem implements Serializable { + + private static final long serialVersionUID = 1L; + + private String resourceId; + private String resourceName; + private ResourceType resourceType; + private String resourcePath; + private String digest; + private String hashType; + private String url; + + @Schema(description = "The identifier of the resource to be synced") + public String getResourceId() { + return resourceId; + } + + public void setResourceId(String resourceId) { + this.resourceId = resourceId; + } + + @Schema(description = "The name of the asset to be synced") + public String getResourceName() { + return resourceName; + } + + public void setResourceName(String resourceName) { + this.resourceName = resourceName; + } + + @Schema(description = "The type of the asset to be synced") + public ResourceType getResourceType() { + return resourceType; + } + + public void setResourceType(ResourceType resourceType) { + this.resourceType = resourceType; + } + + @Schema(description = "The relative path of the asset on the agent") + public String getResourcePath() { + return resourcePath; + } + + public void setResourcePath(String resourcePath) { + this.resourcePath = resourcePath; + } + + @Schema(description = "The checksum hash value calculated for the particular asset") + public String getDigest() { + return digest; + } + + public void setDigest(String digest) { + this.digest = digest; + } + + @Schema(description = "The type of the hashing algorithm used to calculate the checksum") + public String getHashType() { + return hashType; + } + + public void setHashType(String hashType) { + this.hashType = hashType; + } + + @Schema(description = "The relative url of the asset, from where the agent can download from") + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ResourceItem that = (ResourceItem) o; + return Objects.equals(resourceId, that.resourceId) && Objects.equals(resourceName, that.resourceName) && Objects.equals(resourceType, that.resourceType) + && Objects.equals(resourcePath, that.resourcePath) && Objects.equals(digest, that.digest) && Objects.equals(hashType, that.hashType) + && Objects.equals(url, that.url); + } + + @Override + public int hashCode() { + return Objects.hash(resourceId, resourceName, resourceType, resourcePath, digest, hashType, url); + } + + @Override + public String toString() { + return "ResourceItem{" + + "resourceId='" + resourceId + '\'' + + ", resourceName='" + resourceName + '\'' + + ", resourceType=" + resourceType + + ", resourcePath='" + resourcePath + '\'' + + ", digest='" + digest + '\'' + + ", hashType='" + hashType + '\'' + + ", url='" + url + '\'' + + '}'; + } +} + diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ResourceType.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ResourceType.java new file mode 100644 index 0000000000..c20b19d689 --- /dev/null +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ResourceType.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.protocol.api; + +public enum ResourceType { + ASSET, + EXTENSION +} diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ResourcesGlobalHash.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ResourcesGlobalHash.java new file mode 100644 index 0000000000..cf0f337183 --- /dev/null +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/ResourcesGlobalHash.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.protocol.api; + +import io.swagger.v3.oas.annotations.media.Schema; +import java.io.Serializable; +import java.util.Objects; + +public class ResourcesGlobalHash implements Serializable { + + private static final long serialVersionUID = 1L; + + private String digest; + private String hashType; + + @Schema(description = "The calculated hash digest value of asset repository") + public String getDigest() { + return digest; + } + + public void setDigest(String digest) { + this.digest = digest; + } + + @Schema(description = "The type of the hashing algorithm used to calculate the hash digest") + public String getHashType() { + return hashType; + } + + public void setHashType(String hashType) { + this.hashType = hashType; + } + + @Override + public String toString() { + return "ResourcesGlobalHash{" + + "digest='" + digest + '\'' + + ", hashType='" + hashType + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ResourcesGlobalHash that = (ResourcesGlobalHash) o; + return Objects.equals(digest, that.digest) && Objects.equals(hashType, that.hashType); + } + + @Override + public int hashCode() { + return Objects.hash(digest, hashType); + } +} diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java index e0ddec368a..9b351e82f3 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java @@ -58,6 +58,7 @@ import static org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_ALGORITHM; import static org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_KEY; import com.fasterxml.jackson.databind.ObjectMapper; +import java.nio.file.Path; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -79,6 +80,7 @@ import org.apache.nifi.c2.client.service.operation.EmptyOperandPropertiesProvide import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider; import org.apache.nifi.c2.client.service.operation.OperationQueueDAO; import org.apache.nifi.c2.client.service.operation.SupportedOperationsProvider; +import org.apache.nifi.c2.client.service.operation.SyncResourceOperationHandler; import org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler; import org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler; import org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler; @@ -89,6 +91,7 @@ import org.apache.nifi.c2.protocol.api.AgentRepositories; import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus; import org.apache.nifi.c2.protocol.api.FlowQueueStatus; import org.apache.nifi.c2.serializer.C2JacksonSerializer; +import org.apache.nifi.c2.serializer.C2Serializer; import org.apache.nifi.controller.FlowController; import org.apache.nifi.diagnostics.SystemDiagnostics; import org.apache.nifi.encrypt.PropertyEncryptorBuilder; @@ -101,6 +104,9 @@ import org.apache.nifi.minifi.c2.command.PropertiesPersister; import org.apache.nifi.minifi.c2.command.TransferDebugCommandHelper; import org.apache.nifi.minifi.c2.command.UpdateAssetCommandHelper; import org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider; +import org.apache.nifi.minifi.c2.command.syncresource.DefaultSyncResourceStrategy; +import org.apache.nifi.minifi.c2.command.syncresource.FileResourceRepository; +import org.apache.nifi.minifi.c2.command.syncresource.ResourceRepository; import org.apache.nifi.minifi.commons.api.MiNiFiProperties; import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor; import org.apache.nifi.minifi.commons.service.StandardFlowEnrichService; @@ -147,13 +153,16 @@ public class C2NifiClientService { ); this.heartbeatPeriod = clientConfig.getHeartbeatPeriod(); this.flowController = flowController; - - C2HttpClient client = C2HttpClient.create(clientConfig, new C2JacksonSerializer()); + C2Serializer c2Serializer = new C2JacksonSerializer(); + ResourceRepository resourceRepository = + new FileResourceRepository(Path.of(clientConfig.getC2AssetDirectory()), niFiProperties.getNarAutoLoadDirectory().toPath(), + niFiProperties.getFlowConfigurationFileDir().toPath(), c2Serializer); + C2HttpClient client = C2HttpClient.create(clientConfig, c2Serializer); FlowIdHolder flowIdHolder = new FlowIdHolder(clientConfig.getConfDirectory()); - C2HeartbeatFactory heartbeatFactory = new C2HeartbeatFactory(clientConfig, flowIdHolder, new ManifestHashProvider()); + C2HeartbeatFactory heartbeatFactory = new C2HeartbeatFactory(clientConfig, flowIdHolder, new ManifestHashProvider(), resourceRepository::findResourcesGlobalHash); String bootstrapConfigFileLocation = niFiProperties.getProperty("nifi.minifi.bootstrap.file"); C2OperationHandlerProvider c2OperationHandlerProvider = c2OperationHandlerProvider(niFiProperties, flowController, flowService, flowIdHolder, - client, heartbeatFactory, bootstrapConfigFileLocation, clientConfig.getC2AssetDirectory()); + client, heartbeatFactory, bootstrapConfigFileLocation, clientConfig.getC2AssetDirectory(), c2Serializer, resourceRepository); this.supportedOperationsProvider = new SupportedOperationsProvider(c2OperationHandlerProvider.getHandlers()); @@ -206,7 +215,8 @@ public class C2NifiClientService { private C2OperationHandlerProvider c2OperationHandlerProvider(NiFiProperties niFiProperties, FlowController flowController, FlowService flowService, FlowIdHolder flowIdHolder, C2HttpClient client, C2HeartbeatFactory heartbeatFactory, - String bootstrapConfigFileLocation, String c2AssetDirectory) { + String bootstrapConfigFileLocation, String c2AssetDirectory, C2Serializer c2Serializer, + ResourceRepository resourceRepository) { OperandPropertiesProvider emptyOperandPropertiesProvider = new EmptyOperandPropertiesProvider(); TransferDebugCommandHelper transferDebugCommandHelper = new TransferDebugCommandHelper(niFiProperties); UpdateAssetCommandHelper updateAssetCommandHelper = new UpdateAssetCommandHelper(c2AssetDirectory); @@ -229,7 +239,8 @@ public class C2NifiClientService { transferDebugCommandHelper.debugBundleFiles(), transferDebugCommandHelper::excludeSensitiveText), UpdateAssetOperationHandler.create(client, emptyOperandPropertiesProvider, updateAssetCommandHelper::assetUpdatePrecondition, updateAssetCommandHelper::assetPersistFunction), - new UpdatePropertiesOperationHandler(updatePropertiesPropertyProvider, propertiesPersister::persistProperties) + new UpdatePropertiesOperationHandler(updatePropertiesPropertyProvider, propertiesPersister::persistProperties), + SyncResourceOperationHandler.create(client, emptyOperandPropertiesProvider, new DefaultSyncResourceStrategy(resourceRepository), c2Serializer) )); } diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/PropertiesPersister.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/PropertiesPersister.java index 3e68572b9b..93cd18e115 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/PropertiesPersister.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/PropertiesPersister.java @@ -63,7 +63,7 @@ public class PropertiesPersister { this.bootstrapNewFile = new File(bootstrapFile.getParentFile() + "/" + BOOTSTRAP_UPDATED_FILE_NAME); } - public Boolean persistProperties(Map propertiesToUpdate) { + public Boolean persistProperties(Map propertiesToUpdate) { int propertyCountToUpdate = validateProperties(propertiesToUpdate); if (propertyCountToUpdate == 0) { return false; @@ -99,12 +99,12 @@ public class PropertiesPersister { return true; } - private int validateProperties(Map propertiesToUpdate) { + private int validateProperties(Map propertiesToUpdate) { Set updatableProperties = (Set) updatePropertiesPropertyProvider.getProperties().get(AVAILABLE_PROPERTIES); Map updatablePropertyMap = updatableProperties.stream().collect(Collectors.toMap(UpdatableProperty::getPropertyName, Function.identity())); int propertyCountToUpdate = 0; List validationErrors = new ArrayList<>(); - for (Map.Entry entry : propertiesToUpdate.entrySet()) { + for (Map.Entry entry : propertiesToUpdate.entrySet()) { UpdatableProperty updatableProperty = updatablePropertyMap.get(entry.getKey()); if (updatableProperty == null) { validationErrors.add(String.format("You can not update the {} property through C2 protocol", entry.getKey())); @@ -112,7 +112,7 @@ public class PropertiesPersister { } if (!Objects.equals(updatableProperty.getPropertyValue(), entry.getValue())) { if (!getValidator(updatableProperty.getValidator()) - .map(validator -> validator.validate(entry.getKey(), entry.getValue(), validationContext)) + .map(validator -> validator.validate(entry.getKey(), argToString(entry.getValue()), validationContext)) .map(ValidationResult::isValid) .orElse(true)) { validationErrors.add(String.format("Invalid value for %s", entry.getKey())); @@ -140,4 +140,8 @@ public class PropertiesPersister { } return Optional.empty(); } + + private String argToString(Object argument) { + return argument instanceof String arg ? arg : null; + } } diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/DefaultSyncResourceStrategy.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/DefaultSyncResourceStrategy.java new file mode 100644 index 0000000000..a837436ebd --- /dev/null +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/DefaultSyncResourceStrategy.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.c2.command.syncresource; + +import static java.nio.file.Files.copy; +import static java.nio.file.Files.createTempFile; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; +import static java.util.Map.entry; +import static java.util.Optional.empty; +import static java.util.UUID.randomUUID; +import static java.util.function.Predicate.not; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.PARTIALLY_APPLIED; +import static org.apache.nifi.c2.protocol.api.ResourceType.ASSET; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.List; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.apache.nifi.c2.client.service.operation.SyncResourceStrategy; +import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState; +import org.apache.nifi.c2.protocol.api.ResourceItem; +import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultSyncResourceStrategy implements SyncResourceStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultSyncResourceStrategy.class); + + private static final Set> SUCCESS_RESULT_PAIRS = Set.of( + entry(NO_OPERATION, NO_OPERATION), + entry(NO_OPERATION, FULLY_APPLIED), + entry(FULLY_APPLIED, NO_OPERATION), + entry(FULLY_APPLIED, FULLY_APPLIED)); + + private static final Set> FAILED_RESULT_PAIRS = Set.of( + entry(NO_OPERATION, NOT_APPLIED), + entry(NOT_APPLIED, NO_OPERATION), + entry(NOT_APPLIED, NOT_APPLIED)); + + private static final String CHANGE_TO_PARENT_DIR_PATH_SEGMENT = ".."; + + private final ResourceRepository resourceRepository; + + public DefaultSyncResourceStrategy(ResourceRepository resourceRepository) { + this.resourceRepository = resourceRepository; + } + + @Override + public OperationState synchronizeResourceRepository(ResourcesGlobalHash c2GlobalHash, List c2ServerItems, + BiFunction>, Optional> resourceDownloadFunction, + Function> urlEnrichFunction) { + Set c2Items = Set.copyOf(c2ServerItems); + Set agentItems = Set.copyOf(resourceRepository.findAllResourceItems()); + + OperationState deleteResult = deleteItems(c2Items, agentItems); + OperationState saveResult = saveNewItems(c2Items, agentItems, resourceDownloadFunction, urlEnrichFunction); + + Entry resultPair = entry(deleteResult, saveResult); + + return SUCCESS_RESULT_PAIRS.contains(resultPair) + ? saveGlobalHash(c2GlobalHash, deleteResult, saveResult) + : FAILED_RESULT_PAIRS.contains(resultPair) ? NOT_APPLIED : PARTIALLY_APPLIED; + } + + private OperationState saveNewItems(Set c2Items, Set agentItems, + BiFunction>, Optional> resourceDownloadFunction, + Function> urlEnrichFunction) { + List newItems = c2Items.stream().filter(not(agentItems::contains)).toList(); + if (newItems.isEmpty()) { + return NO_OPERATION; + } + + List addedItems = newItems.stream() + .filter(this::validate) + .map(downloadIfNotPresentAndAddToRepository(resourceDownloadFunction, urlEnrichFunction)) + .flatMap(Optional::stream) + .toList(); + + return addedItems.isEmpty() + ? NOT_APPLIED + : newItems.size() == addedItems.size() ? FULLY_APPLIED : PARTIALLY_APPLIED; + } + + private Function> downloadIfNotPresentAndAddToRepository( + BiFunction>, Optional> resourceDownloadFunction, Function> urlEnrichFunction) { + return resourceItem -> resourceRepository.resourceItemBinaryPresent(resourceItem) + ? resourceRepository.addResourceItem(resourceItem) + : urlEnrichFunction.apply(resourceItem.getUrl()) + .flatMap(enrichedUrl -> resourceDownloadFunction.apply(enrichedUrl, this::persistToTemporaryLocation)) + .flatMap(tempResourcePath -> resourceRepository.addResourceItem(resourceItem, tempResourcePath)); + } + + private boolean validate(ResourceItem resourceItem) { + if (resourceItem.getResourcePath() != null + && resourceItem.getResourceType() == ASSET + && resourceItem.getResourcePath().contains(CHANGE_TO_PARENT_DIR_PATH_SEGMENT)) { + LOG.error("Resource path should not contain '..' path segment in {}", resourceItem); + return false; + } + return true; + } + + private Optional persistToTemporaryLocation(InputStream inputStream) { + String tempResourceId = randomUUID().toString(); + try { + Path tempFile = createTempFile(tempResourceId, null); + copy(inputStream, tempFile, REPLACE_EXISTING); + return Optional.of(tempFile); + } catch (IOException e) { + LOG.error("Unable to download resource. Will retry in next heartbeat iteration", e); + return empty(); + } + } + + private OperationState deleteItems(Set c2Items, Set agentItems) { + List toDeleteItems = agentItems.stream().filter(not(c2Items::contains)).toList(); + if (toDeleteItems.isEmpty()) { + return NO_OPERATION; + } + + List deletedItems = toDeleteItems.stream() + .map(resourceRepository::deleteResourceItem) + .flatMap(Optional::stream) + .toList(); + + return deletedItems.isEmpty() + ? NOT_APPLIED + : deletedItems.size() == toDeleteItems.size() ? FULLY_APPLIED : PARTIALLY_APPLIED; + } + + private OperationState saveGlobalHash(ResourcesGlobalHash resourcesGlobalHash, OperationState deleteResult, OperationState saveResult) { + boolean isGlobalHashRefreshOnly = deleteResult == NO_OPERATION && saveResult == NO_OPERATION; + return resourceRepository.saveResourcesGlobalHash(resourcesGlobalHash) + .map(unused -> FULLY_APPLIED) + .orElse(isGlobalHashRefreshOnly ? NOT_APPLIED : PARTIALLY_APPLIED); + } +} diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/FileResourceRepository.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/FileResourceRepository.java new file mode 100644 index 0000000000..1874728782 --- /dev/null +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/FileResourceRepository.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.c2.command.syncresource; + +import static java.nio.file.Files.copy; +import static java.nio.file.Files.createDirectories; +import static java.nio.file.Files.deleteIfExists; +import static java.nio.file.Files.exists; +import static java.nio.file.Files.newInputStream; +import static java.nio.file.Files.readString; +import static java.nio.file.Files.writeString; +import static java.nio.file.StandardCopyOption.COPY_ATTRIBUTES; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.SYNC; +import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; +import static java.nio.file.StandardOpenOption.WRITE; +import static java.util.Optional.empty; +import static java.util.Optional.of; +import static java.util.Optional.ofNullable; +import static java.util.function.Predicate.not; +import static java.util.stream.Collectors.toList; +import static org.apache.commons.codec.digest.DigestUtils.md5Hex; +import static org.apache.commons.codec.digest.DigestUtils.sha1Hex; +import static org.apache.commons.codec.digest.DigestUtils.sha256Hex; +import static org.apache.commons.codec.digest.DigestUtils.sha512Hex; +import static org.apache.commons.codec.digest.MessageDigestAlgorithms.MD5; +import static org.apache.commons.codec.digest.MessageDigestAlgorithms.SHA_1; +import static org.apache.commons.codec.digest.MessageDigestAlgorithms.SHA_256; +import static org.apache.commons.codec.digest.MessageDigestAlgorithms.SHA_512; +import static org.apache.commons.io.file.PathUtils.createParentDirectories; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.nifi.c2.protocol.api.ResourceItem; +import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash; +import org.apache.nifi.c2.serializer.C2Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FileResourceRepository implements ResourceRepository { + + static final String ASSET_REPOSITORY_DIRECTORY = "repository"; + static final String RESOURCE_REPOSITORY_FILE_NAME = "resources.json"; + + private static final Logger LOG = LoggerFactory.getLogger(FileResourceRepository.class); + private static final ResourceRepositoryDescriptor EMTPY_HOLDER = new ResourceRepositoryDescriptor(new ResourcesGlobalHash(), List.of()); + + private final Path assetRepositoryDirectory; + private final Path extensionDirectory; + private final Path resourceRepositoryFile; + private final C2Serializer c2Serializer; + + private ResourceRepositoryDescriptor resourceRepositoryDescriptor; + + public FileResourceRepository(Path assetDirectory, Path extensionDirectory, Path configDirectory, C2Serializer c2Serializer) { + this.resourceRepositoryFile = configDirectory.resolve(RESOURCE_REPOSITORY_FILE_NAME); + this.c2Serializer = c2Serializer; + this.assetRepositoryDirectory = assetDirectory.resolve(ASSET_REPOSITORY_DIRECTORY); + this.extensionDirectory = extensionDirectory; + initialize(); + } + + @Override + public synchronized ResourcesGlobalHash findResourcesGlobalHash() { + return resourceRepositoryDescriptor.resourcesGlobalHash(); + } + + @Override + public synchronized Optional saveResourcesGlobalHash(ResourcesGlobalHash resourcesGlobalHash) { + ResourceRepositoryDescriptor newRepositoryDescriptor = new ResourceRepositoryDescriptor(resourcesGlobalHash, resourceRepositoryDescriptor.resourceItems()); + try { + persist(newRepositoryDescriptor); + return Optional.of(resourcesGlobalHash); + } catch (IOException e) { + LOG.error("Unable to save global resource hash data", e); + return empty(); + } + } + + @Override + public synchronized List findAllResourceItems() { + return resourceRepositoryDescriptor.resourceItems(); + } + + @Override + public synchronized boolean resourceItemBinaryPresent(ResourceItem resourceItem) { + Path path = resourcePath(resourceItem); + return exists(path) && calculateDigest(path, resourceItem.getHashType()).equals(resourceItem.getDigest()); + } + + @Override + public synchronized Optional addResourceItem(ResourceItem resourceItem) { + try { + List newItems = new ArrayList<>(resourceRepositoryDescriptor.resourceItems()); + newItems.add(resourceItem); + ResourceRepositoryDescriptor newRepositoryDescriptor = new ResourceRepositoryDescriptor(resourceRepositoryDescriptor.resourcesGlobalHash(), newItems); + persist(newRepositoryDescriptor); + return of(resourceItem); + } catch (IOException e) { + LOG.error("Unable to persist repository metadata", e); + return empty(); + } + } + + @Override + public synchronized Optional addResourceItem(ResourceItem resourceItem, Path source) { + Path resourcePath = resourcePath(resourceItem); + try { + createParentDirectories(resourcePath); + copy(source, resourcePath, REPLACE_EXISTING, COPY_ATTRIBUTES); + } catch (IOException e) { + LOG.error("Unable to move resource to final location. Syncing this asset will be retried in next heartbeat iteration", e); + return empty(); + } finally { + deleteSilently(source, "Unable to clear temporary file"); + } + + Optional addedItem = addResourceItem(resourceItem); + if (addedItem.isEmpty()) { + deleteSilently(resourcePath, "Unable to cleanup resource file"); + } + return addedItem; + } + + @Override + public synchronized Optional deleteResourceItem(ResourceItem resourceItem) { + List truncatedItems = resourceRepositoryDescriptor.resourceItems() + .stream() + .filter(syncAssetItem -> !syncAssetItem.getResourceId().equals(resourceItem.getResourceId())) + .collect(toList()); + + ResourceRepositoryDescriptor newRepositoryDescriptor = new ResourceRepositoryDescriptor(resourceRepositoryDescriptor.resourcesGlobalHash(), truncatedItems); + try { + persist(newRepositoryDescriptor); + } catch (Exception e) { + LOG.error("Unable to persist repository metadata", e); + return empty(); + } + + Path resourcePath = resourcePath(resourceItem); + deleteSilently(resourcePath, "Unable to delete resource file"); + + return Optional.of(resourceItem); + } + + private void initialize() { + try { + createDirectories(assetRepositoryDirectory); + createDirectories(extensionDirectory); + + if (!exists(resourceRepositoryFile)) { + persist(EMTPY_HOLDER); + } else { + load(); + } + } catch (IOException e) { + throw new RuntimeException("Unable to initialize resource repository", e); + } + } + + private void persist(ResourceRepositoryDescriptor descriptor) throws IOException { + String serializedDescriptor = + c2Serializer.serialize(descriptor).orElseThrow(() -> new IllegalStateException("Unable to serialize repository descriptor object")); + writeString(resourceRepositoryFile, serializedDescriptor, CREATE, TRUNCATE_EXISTING, WRITE, SYNC); + resourceRepositoryDescriptor = descriptor; + } + + private void load() throws IOException { + String rawResourceRepository = readString(resourceRepositoryFile); + resourceRepositoryDescriptor = c2Serializer.deserialize(rawResourceRepository, ResourceRepositoryDescriptor.class) + .orElseThrow(() -> new IllegalStateException("Unable to deserialize repository descriptor object")); + } + + private Path resourcePath(ResourceItem resourceItem) { + return switch (resourceItem.getResourceType()) { + case ASSET -> ofNullable(resourceItem.getResourcePath()) + .filter(not(String::isBlank)) + .map(assetRepositoryDirectory::resolve) + .orElse(assetRepositoryDirectory) + .resolve(resourceItem.getResourceName()); + case EXTENSION -> extensionDirectory.resolve(resourceItem.getResourceName()); + }; + } + + private void deleteSilently(Path path, String errorMessage) { + try { + deleteIfExists(path); + } catch (IOException e) { + LOG.error(errorMessage, e); + } + } + + private String calculateDigest(Path path, String digestAlgorithm) { + try (InputStream inputStream = newInputStream(path)) { + return switch (digestAlgorithm) { + case MD5 -> md5Hex(inputStream); + case SHA_1 -> sha1Hex(inputStream); + case SHA_256 -> sha256Hex(inputStream); + case SHA_512 -> sha512Hex(inputStream); + default -> throw new Exception("Unsupported digest algorithm: " + digestAlgorithm); + }; + } catch (Exception e) { + throw new RuntimeException("Unable to calculate digest for resource", e); + } + } + + record ResourceRepositoryDescriptor(ResourcesGlobalHash resourcesGlobalHash, List resourceItems) { + } +} diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/ResourceRepository.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/ResourceRepository.java new file mode 100644 index 0000000000..db431d361d --- /dev/null +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/ResourceRepository.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.c2.command.syncresource; + +import java.nio.file.Path; +import java.util.List; +import java.util.Optional; +import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash; +import org.apache.nifi.c2.protocol.api.ResourceItem; + +public interface ResourceRepository { + + ResourcesGlobalHash findResourcesGlobalHash(); + + Optional saveResourcesGlobalHash(ResourcesGlobalHash resourcesGlobalHash); + + List findAllResourceItems(); + + boolean resourceItemBinaryPresent(ResourceItem resourceItem); + + Optional addResourceItem(ResourceItem resourceItem); + + Optional addResourceItem(ResourceItem resourceItem, Path source); + + Optional deleteResourceItem(ResourceItem resourceItem); +} diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/syncresource/DefaultSyncResourceStrategyTest.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/syncresource/DefaultSyncResourceStrategyTest.java new file mode 100644 index 0000000000..c54c828c54 --- /dev/null +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/syncresource/DefaultSyncResourceStrategyTest.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.c2.command.syncresource; + +import static java.lang.Boolean.TRUE; +import static java.nio.file.Files.createTempFile; +import static java.util.Optional.empty; +import static java.util.Optional.ofNullable; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.PARTIALLY_APPLIED; +import static org.apache.nifi.c2.protocol.api.ResourceType.ASSET; +import static org.apache.nifi.c2.protocol.api.ResourceType.EXTENSION; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState; +import org.apache.nifi.c2.protocol.api.ResourceItem; +import org.apache.nifi.c2.protocol.api.ResourceType; +import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class DefaultSyncResourceStrategyTest { + + private static final ResourcesGlobalHash C2_GLOBAL_HASH = resourcesGlobalHash("digest1"); + + private static final String FAIL_DOWNLOAD_URL = "fail"; + + private static final BiFunction>, Optional> URL_TO_CONTENT_DOWNLOAD_FUNCTION = + (url, persistFunction) -> url.endsWith(FAIL_DOWNLOAD_URL) ? empty() : persistFunction.apply(new ByteArrayInputStream(url.getBytes())); + + private static String ENRICH_PREFIX = "pre_"; + private static final Function> PREFIXING_ENRICH_FUNCTION = url -> ofNullable(url).map(arg -> ENRICH_PREFIX + arg); + + @Mock + private ResourceRepository mockResourceRepository; + + private DefaultSyncResourceStrategy testSyncResourceStrategy; + + @BeforeEach + public void setup() { + testSyncResourceStrategy = new DefaultSyncResourceStrategy(mockResourceRepository); + } + + @Test + public void testAddingNewItems() { + List c2Items = List.of( + resourceItem("resource1", "url1", null, ASSET), + resourceItem("resource2", "url2", "", ASSET), + resourceItem("resource3", "url3", "path3", ASSET), + resourceItem("resource4", "url4", null, EXTENSION), + resourceItem("resource5", "url5", "path5", EXTENSION) + ); + when(mockResourceRepository.findAllResourceItems()).thenReturn(List.of()); + when(mockResourceRepository.saveResourcesGlobalHash(C2_GLOBAL_HASH)).thenReturn(Optional.of(C2_GLOBAL_HASH)); + c2Items.forEach(resourceItem -> { + try { + when(mockResourceRepository.addResourceItem(eq(resourceItem), any())).thenReturn(Optional.of(resourceItem)); + } catch (Exception e) { + } + }); + + OperationState resultState = + testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, c2Items, URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION); + + assertEquals(FULLY_APPLIED, resultState); + try { + verify(mockResourceRepository, never()).deleteResourceItem(any()); + } catch (Exception e) { + } + } + + @Test + public void testAddingNewItemWhenBinaryPresent() { + ResourceItem resourceItem = resourceItem("resource1", "url1", null, ASSET); + when(mockResourceRepository.findAllResourceItems()).thenReturn(List.of()); + when(mockResourceRepository.saveResourcesGlobalHash(C2_GLOBAL_HASH)).thenReturn(Optional.of(C2_GLOBAL_HASH)); + when(mockResourceRepository.addResourceItem(resourceItem)).thenReturn(Optional.of(resourceItem)); + when(mockResourceRepository.resourceItemBinaryPresent(resourceItem)).thenReturn(TRUE); + + OperationState resultState = + testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, List.of(resourceItem), URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION); + + assertEquals(FULLY_APPLIED, resultState); + try { + verify(mockResourceRepository, never()).deleteResourceItem(any()); + } catch (Exception e) { + } + } + + @Test + public void testAddingNewItemFailureWhenTypeIsAssetAndPathContainsDoubleDots() { + List c2Items = List.of( + resourceItem("resource1", "valid_url", "../path", ASSET) + ); + when(mockResourceRepository.findAllResourceItems()).thenReturn(List.of()); + + OperationState resultState = + testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, c2Items, URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION); + + assertEquals(NOT_APPLIED, resultState); + try { + verify(mockResourceRepository, never()).deleteResourceItem(any()); + verify(mockResourceRepository, never()).addResourceItem(any()); + verify(mockResourceRepository, never()).addResourceItem(any(), any()); + verify(mockResourceRepository, never()).saveResourcesGlobalHash(C2_GLOBAL_HASH); + } catch (Exception e) { + } + } + + @Test + public void testAddingNewItemFailureDueToIssueWithUrlEnrichment() { + List c2Items = List.of( + resourceItem("resource1", null, null, ASSET) + ); + when(mockResourceRepository.findAllResourceItems()).thenReturn(List.of()); + + OperationState resultState = + testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, c2Items, URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION); + + assertEquals(NOT_APPLIED, resultState); + try { + verify(mockResourceRepository, never()).deleteResourceItem(any()); + verify(mockResourceRepository, never()).saveResourcesGlobalHash(C2_GLOBAL_HASH); + } catch (Exception e) { + } + } + + @Test + public void testAddingNewItemFailureDueToIssueInDownloadFunction() { + List c2Items = List.of( + resourceItem("resource1", FAIL_DOWNLOAD_URL, null, ASSET) + ); + when(mockResourceRepository.findAllResourceItems()).thenReturn(List.of()); + + OperationState resultState = + testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, c2Items, URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION); + + assertEquals(NOT_APPLIED, resultState); + try { + verify(mockResourceRepository, never()).deleteResourceItem(any()); + verify(mockResourceRepository, never()).saveResourcesGlobalHash(C2_GLOBAL_HASH); + } catch (Exception e) { + } + } + + @Test + public void testAddingNewItemFailureDueToIssueInPersistFunction() { + List c2Items = List.of( + resourceItem("resource1", "url1", null, ASSET) + ); + when(mockResourceRepository.findAllResourceItems()).thenReturn(List.of()); + + try (MockedStatic mockedFiles = mockStatic(Files.class)) { + mockedFiles.when(() -> createTempFile(anyString(), eq(null))).thenThrow(IOException.class); + + OperationState resultState = + testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, c2Items, URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION); + + assertEquals(NOT_APPLIED, resultState); + try { + verify(mockResourceRepository, never()).deleteResourceItem(any()); + verify(mockResourceRepository, never()).saveResourcesGlobalHash(C2_GLOBAL_HASH); + } catch (Exception e) { + } + } + } + + @Test + public void testAddingNewItemFailureDueToIssueWhenUpdatingRepository() { + ResourceItem resourceItem = resourceItem("resource1", "url1", null, ASSET); + List c2Items = List.of(resourceItem); + when(mockResourceRepository.findAllResourceItems()).thenReturn(List.of()); + try { + when(mockResourceRepository.addResourceItem(resourceItem)).thenThrow(Exception.class); + } catch (Exception e) { + } + + OperationState resultState = + testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, c2Items, URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION); + + assertEquals(NOT_APPLIED, resultState); + try { + verify(mockResourceRepository, never()).deleteResourceItem(any()); + verify(mockResourceRepository, never()).saveResourcesGlobalHash(C2_GLOBAL_HASH); + } catch (Exception e) { + } + } + + @Test + public void testDeletingAllItems() { + List c2Items = List.of(); + List agentItems = List.of( + resourceItem("resource1", "url1", null, ASSET), + resourceItem("resource2", "url2", null, ASSET), + resourceItem("resource3", "url3", null, EXTENSION) + ); + when(mockResourceRepository.findAllResourceItems()).thenReturn(agentItems); + when(mockResourceRepository.saveResourcesGlobalHash(C2_GLOBAL_HASH)).thenReturn(Optional.of(C2_GLOBAL_HASH)); + agentItems.forEach(agentItem -> { + try { + when(mockResourceRepository.deleteResourceItem(agentItem)).thenReturn(Optional.of(agentItem)); + } catch (Exception e) { + } + }); + + OperationState resultState = + testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, c2Items, URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION); + + assertEquals(FULLY_APPLIED, resultState); + try { + verify(mockResourceRepository, never()).addResourceItem(any()); + } catch (Exception e) { + } + } + + @Test + public void testDeleteFailureDueToIssueWithUpdatingRepository() { + List c2Items = List.of(); + List agentItems = List.of( + resourceItem("resource1", "url1", null, ASSET) + ); + when(mockResourceRepository.findAllResourceItems()).thenReturn(agentItems); + agentItems.forEach(agentItem -> { + try { + when(mockResourceRepository.deleteResourceItem(agentItem)).thenThrow(Exception.class); + } catch (Exception e) { + } + }); + + OperationState resultState = + testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, c2Items, URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION); + + assertEquals(NOT_APPLIED, resultState); + try { + verify(mockResourceRepository, never()).addResourceItem(any()); + verify(mockResourceRepository, never()).saveResourcesGlobalHash(C2_GLOBAL_HASH); + } catch (Exception e) { + } + } + + @Test + public void testAddFileSuccessfulButUpdateGlobalHashFails() { + ResourceItem c2Item = resourceItem("resource1", "url1", null, ASSET); + when(mockResourceRepository.findAllResourceItems()).thenReturn(List.of()); + try { + when(mockResourceRepository.addResourceItem(eq(c2Item), any())).thenReturn(Optional.of(c2Item)); + when(mockResourceRepository.saveResourcesGlobalHash(C2_GLOBAL_HASH)).thenThrow(Exception.class); + } catch (Exception e) { + } + + OperationState resultState = + testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, List.of(c2Item), URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION); + + assertEquals(PARTIALLY_APPLIED, resultState); + try { + verify(mockResourceRepository, never()).deleteResourceItem(any()); + } catch (Exception e) { + } + } + + private static ResourcesGlobalHash resourcesGlobalHash(String digest) { + ResourcesGlobalHash resourcesGlobalHash = new ResourcesGlobalHash(); + resourcesGlobalHash.setDigest(digest); + return resourcesGlobalHash; + } + + private ResourceItem resourceItem(String name, String url, String path, ResourceType resourceType) { + ResourceItem resourceItem = new ResourceItem(); + resourceItem.setResourceName(name); + resourceItem.setUrl(url); + resourceItem.setResourceType(resourceType); + resourceItem.setResourcePath(path); + return resourceItem; + } +} diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/syncresource/FileResourceRepositoryTest.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/syncresource/FileResourceRepositoryTest.java new file mode 100644 index 0000000000..608d29369f --- /dev/null +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/syncresource/FileResourceRepositoryTest.java @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.c2.command.syncresource; + +import static java.nio.file.Files.createDirectories; +import static java.nio.file.Files.exists; +import static java.nio.file.Files.readString; +import static java.nio.file.Files.writeString; +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.SYNC; +import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; +import static java.nio.file.StandardOpenOption.WRITE; +import static java.util.UUID.randomUUID; +import static org.apache.commons.codec.digest.DigestUtils.sha512Hex; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.nifi.c2.protocol.api.ResourceType.ASSET; +import static org.apache.nifi.c2.protocol.api.ResourceType.EXTENSION; +import static org.apache.nifi.minifi.c2.command.syncresource.FileResourceRepository.ASSET_REPOSITORY_DIRECTORY; +import static org.apache.nifi.minifi.c2.command.syncresource.FileResourceRepository.RESOURCE_REPOSITORY_FILE_NAME; +import static org.apache.nifi.util.StringUtils.EMPTY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertIterableEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrowsExactly; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mockStatic; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Optional; +import org.apache.nifi.c2.protocol.api.ResourceItem; +import org.apache.nifi.c2.protocol.api.ResourceType; +import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash; +import org.apache.nifi.c2.serializer.C2JacksonSerializer; +import org.apache.nifi.c2.serializer.C2Serializer; +import org.apache.nifi.minifi.c2.command.syncresource.FileResourceRepository.ResourceRepositoryDescriptor; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class FileResourceRepositoryTest { + + private static final String RESOURCE_BINARY_CONTENT = "content"; + + @TempDir + private Path testBaseDirectory; + + private C2Serializer c2Serializer; + + private Path assetDirectory; + private Path assetRepositoryDirectory; + private Path extensionDirectory; + private Path repositoryFile; + private Path configDirectoryPath; + + @BeforeEach + public void setup() throws IOException { + c2Serializer = new C2JacksonSerializer(); + configDirectoryPath = testBaseDirectory.resolve("conf"); + createDirectories(configDirectoryPath); + assetDirectory = testBaseDirectory.resolve("assets"); + assetRepositoryDirectory = assetDirectory.resolve(ASSET_REPOSITORY_DIRECTORY); + createDirectories(assetRepositoryDirectory); + extensionDirectory = testBaseDirectory.resolve("extensions"); + createDirectories(extensionDirectory); + repositoryFile = configDirectoryPath.resolve(RESOURCE_REPOSITORY_FILE_NAME); + } + + @Test + public void testRepositoryInitializesWithEmptyContent() throws IOException { + FileResourceRepository testRepository = createTestRepository(); + + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertNull(testRepository.findResourcesGlobalHash().getDigest()); + assertNull(testRepository.findResourcesGlobalHash().getHashType()); + assertEquals(List.of(), testRepository.findAllResourceItems()); + } + + @Test + public void testRepositoryInitializesWithExistingContent() throws IOException { + ResourcesGlobalHash resourcesGlobalHash = resourcesGlobalHash("digest", "hashType"); + ResourceItem resourceItem = resourceItem("resourceId", null, ASSET); + ResourceRepositoryDescriptor initialRepositoryDescriptor = new ResourceRepositoryDescriptor(resourcesGlobalHash, List.of(resourceItem)); + saveRepository(initialRepositoryDescriptor); + + FileResourceRepository testRepository = createTestRepository(); + + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertEquals("digest", testRepository.findResourcesGlobalHash().getDigest()); + assertEquals("hashType", testRepository.findResourcesGlobalHash().getHashType()); + assertEquals(1, testRepository.findAllResourceItems().size()); + assertEquals("resourceId", testRepository.findAllResourceItems().getFirst().getResourceId()); + } + + @Test + public void testRepositoryInitializationFailure() { + try (MockedStatic mockedFiles = mockStatic(Files.class)) { + mockedFiles.when(() -> createDirectories(any())).thenThrow(new IOException()); + assertThrowsExactly(RuntimeException.class, this::createTestRepository); + } + } + + @Test + public void testSaveGlobalHashSuccess() throws IOException { + ResourcesGlobalHash originalGlobalHash = resourcesGlobalHash("digest1", "hashType1"); + ResourceRepositoryDescriptor initialRepositoryDescriptor = new ResourceRepositoryDescriptor(originalGlobalHash, List.of()); + saveRepository(initialRepositoryDescriptor); + + FileResourceRepository testRepository = createTestRepository(); + + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertEquals("digest1", testRepository.findResourcesGlobalHash().getDigest()); + assertEquals("hashType1", testRepository.findResourcesGlobalHash().getHashType()); + assertTrue(testRepository.findAllResourceItems().isEmpty()); + + ResourcesGlobalHash updatedGlobalHash = resourcesGlobalHash("digest2", "hashType2"); + testRepository.saveResourcesGlobalHash(updatedGlobalHash); + + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertEquals("digest2", testRepository.findResourcesGlobalHash().getDigest()); + assertEquals("hashType2", testRepository.findResourcesGlobalHash().getHashType()); + assertTrue(testRepository.findAllResourceItems().isEmpty()); + } + + @Test + public void testSaveGlobalHashSuccessFailure() throws IOException { + ResourcesGlobalHash originalGlobalHash = resourcesGlobalHash("digest1", "hashType1"); + ResourceRepositoryDescriptor initialRepositoryDescriptor = new ResourceRepositoryDescriptor(originalGlobalHash, List.of()); + saveRepository(initialRepositoryDescriptor); + + FileResourceRepository testRepository = createTestRepository(); + + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertEquals("digest1", testRepository.findResourcesGlobalHash().getDigest()); + assertEquals("hashType1", testRepository.findResourcesGlobalHash().getHashType()); + assertTrue(testRepository.findAllResourceItems().isEmpty()); + + Optional result; + try (MockedStatic mockedFiles = mockStatic(Files.class)) { + mockedFiles.when(() -> writeString(any(), any(), eq(CREATE), eq(TRUNCATE_EXISTING), eq(WRITE), eq(SYNC))).thenThrow(new IOException()); + + ResourcesGlobalHash updatedGlobalHash = resourcesGlobalHash("digest2", "hashType2"); + result = testRepository.saveResourcesGlobalHash(updatedGlobalHash); + + } + assertTrue(result.isEmpty()); + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertEquals("digest1", testRepository.findResourcesGlobalHash().getDigest()); + assertEquals("hashType1", testRepository.findResourcesGlobalHash().getHashType()); + assertTrue(testRepository.findAllResourceItems().isEmpty()); + } + + @Test + public void testResourceItemBinaryPresent() throws IOException { + String content = "content"; + String digest = sha512Hex(content); + ResourceItem resourceItem = resourceItem("resource1", null, ASSET, "SHA-512", digest); + createResourceBinary(resourceItem, content); + + FileResourceRepository testRepository = createTestRepository(); + + boolean result = testRepository.resourceItemBinaryPresent(resourceItem); + assertTrue(result); + } + + @Test + public void testResourceItemBinaryPresentHashMismatch() throws IOException { + String content = "content"; + ResourceItem resourceItem = resourceItem("resource1", null, ASSET, "SHA-512", "not_matching_hash"); + createResourceBinary(resourceItem, content); + + FileResourceRepository testRepository = createTestRepository(); + boolean result = testRepository.resourceItemBinaryPresent(resourceItem); + assertFalse(result); + } + + @Test + public void testResourceItemBinaryPresentUnsupportedHashAlgorithm() throws IOException { + String content = "content"; + ResourceItem resourceItem = resourceItem("resource1", null, ASSET, "unsupported_algorithm", "some_hash"); + createResourceBinary(resourceItem, content); + + FileResourceRepository testRepository = createTestRepository(); + assertThrowsExactly(RuntimeException.class, () -> testRepository.resourceItemBinaryPresent(resourceItem)); + } + + @Test + public void testAddResourceItemsWithoutContent() throws IOException { + FileResourceRepository testRepository = createTestRepository(); + + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertTrue(testRepository.findAllResourceItems().isEmpty()); + + ResourceItem firstNewItem = resourceItem("resource1", null, ASSET); + Optional firstResult = testRepository.addResourceItem(firstNewItem); + + assertTrue(firstResult.isPresent()); + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertEquals(1, testRepository.findAllResourceItems().size()); + assertEquals("resource1", testRepository.findAllResourceItems().getFirst().getResourceId()); + + ResourceItem secondNewItem = resourceItem("resource2", null, ASSET); + Optional secondResult = testRepository.addResourceItem(secondNewItem); + + assertTrue(secondResult.isPresent()); + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertEquals(2, testRepository.findAllResourceItems().size()); + assertIterableEquals(List.of("resource1", "resource2"), testRepository.findAllResourceItems().stream().map(ResourceItem::getResourceId).toList()); + } + + @Test + public void testAddResourceItemsWithoutContentErrorCase() throws IOException { + FileResourceRepository testRepository = createTestRepository(); + + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertTrue(testRepository.findAllResourceItems().isEmpty()); + + ResourceItem firstNewItem = resourceItem("resource1", null, ASSET); + Optional result; + try (MockedStatic mockedFiles = mockStatic(Files.class)) { + mockedFiles.when(() -> writeString(any(), any(), eq(CREATE), eq(TRUNCATE_EXISTING), eq(WRITE), eq(SYNC))).thenThrow(new IOException()); + + result = testRepository.addResourceItem(firstNewItem); + } + assertTrue(result.isEmpty()); + } + + @Test + public void testAddResourceItemsWithContent() throws IOException { + FileResourceRepository testRepository = createTestRepository(); + + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertTrue(testRepository.findAllResourceItems().isEmpty()); + + ResourceItem firstNewItem = resourceItem("resource1", null, ASSET); + Path firstItemTempPath = createTempBinary(); + Path firstItemExpectedPath = resourcePath(firstNewItem); + Optional firstResult = testRepository.addResourceItem(firstNewItem, firstItemTempPath); + + assertTrue(firstResult.isPresent()); + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertEquals(1, testRepository.findAllResourceItems().size()); + assertEquals("resource1", testRepository.findAllResourceItems().getFirst().getResourceId()); + assertFalse(exists(firstItemTempPath)); + assertTrue(exists(firstItemExpectedPath)); + + ResourceItem secondNewItem = resourceItem("resource2", "subdirectory", ASSET); + Path secondItemTempPath = createTempBinary(); + Path secondItemExpectedPath = resourcePath(secondNewItem); + Optional secondResult = testRepository.addResourceItem(secondNewItem, secondItemTempPath); + + assertTrue(secondResult.isPresent()); + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertEquals(2, testRepository.findAllResourceItems().size()); + assertIterableEquals(List.of("resource1", "resource2"), testRepository.findAllResourceItems().stream().map(ResourceItem::getResourceId).toList()); + assertFalse(exists(secondItemTempPath)); + assertTrue(exists(firstItemExpectedPath)); + assertTrue(exists(secondItemExpectedPath)); + + ResourceItem thirdNewItem = resourceItem("resource3", null, EXTENSION); + Path thirdItemTempPath = createTempBinary(); + Path thirdItemExpectedPath = resourcePath(thirdNewItem); + Optional thirdResult = testRepository.addResourceItem(thirdNewItem, thirdItemTempPath); + + assertTrue(thirdResult.isPresent()); + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertEquals(3, testRepository.findAllResourceItems().size()); + assertIterableEquals(List.of("resource1", "resource2", "resource3"), testRepository.findAllResourceItems().stream().map(ResourceItem::getResourceId).toList()); + assertFalse(exists(thirdItemTempPath)); + assertTrue(exists(firstItemExpectedPath)); + assertTrue(exists(secondItemExpectedPath)); + assertTrue(exists(thirdItemExpectedPath)); + } + + @Test + public void testAddResourceItemsWithContentErrorWhenCopying() throws IOException { + FileResourceRepository testRepository = createTestRepository(); + + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertTrue(testRepository.findAllResourceItems().isEmpty()); + + ResourceItem resourceItem = resourceItem("resource1", null, ASSET); + Path resourceItemTempPath = Path.of("non_existing_path"); + Path resourceItemExpectedPath = resourcePath(resourceItem); + + Optional result = testRepository.addResourceItem(resourceItem, resourceItemTempPath); + + assertTrue(result.isEmpty()); + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertEquals(0, testRepository.findAllResourceItems().size()); + assertFalse(exists(resourceItemTempPath)); + assertFalse(exists(resourceItemExpectedPath)); + } + + @Test + public void testDeleteResourceItem() throws IOException { + ResourceItem firstItem = resourceItem("resource1", null, ASSET); + Path firstItemPath = createResourceBinary(firstItem, RESOURCE_BINARY_CONTENT); + ResourceItem secondItem = resourceItem("resource2", null, ASSET); + Path secondItemPath = createResourceBinary(secondItem, RESOURCE_BINARY_CONTENT); + ResourceItem thirdItem = resourceItem("resource3", null, ASSET); + Path thirdItemPath = createResourceBinary(thirdItem, RESOURCE_BINARY_CONTENT); + ResourceRepositoryDescriptor initialRepositoryDescriptor = new ResourceRepositoryDescriptor(new ResourcesGlobalHash(), List.of(firstItem, secondItem, thirdItem)); + saveRepository(initialRepositoryDescriptor); + + FileResourceRepository testRepository = createTestRepository(); + + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertEquals(3, testRepository.findAllResourceItems().size()); + assertIterableEquals(List.of("resource1", "resource2", "resource3"), testRepository.findAllResourceItems().stream().map(ResourceItem::getResourceId).toList()); + assertTrue(exists(firstItemPath)); + assertTrue(exists(secondItemPath)); + assertTrue(exists(thirdItemPath)); + + Optional firstResult = testRepository.deleteResourceItem(firstItem); + assertTrue(firstResult.isPresent()); + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertEquals(2, testRepository.findAllResourceItems().size()); + assertIterableEquals(List.of("resource2", "resource3"), testRepository.findAllResourceItems().stream().map(ResourceItem::getResourceId).toList()); + assertFalse(exists(firstItemPath)); + assertTrue(exists(secondItemPath)); + assertTrue(exists(thirdItemPath)); + + Optional secondResult = testRepository.deleteResourceItem(thirdItem); + assertTrue(secondResult.isPresent()); + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertEquals(1, testRepository.findAllResourceItems().size()); + assertIterableEquals(List.of("resource2"), testRepository.findAllResourceItems().stream().map(ResourceItem::getResourceId).toList()); + assertFalse(exists(firstItemPath)); + assertTrue(exists(secondItemPath)); + assertFalse(exists(thirdItemPath)); + + Optional thirdResult = testRepository.deleteResourceItem(secondItem); + assertTrue(thirdResult.isPresent()); + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertEquals(0, testRepository.findAllResourceItems().size()); + assertIterableEquals(List.of(), testRepository.findAllResourceItems().stream().map(ResourceItem::getResourceId).toList()); + assertFalse(exists(firstItemPath)); + assertFalse(exists(secondItemPath)); + assertFalse(exists(thirdItemPath)); + } + + @Test + public void testDeleteResourceItemErrorCase() throws IOException { + FileResourceRepository testRepository = createTestRepository(); + assertRepositoryInMemoryContentEqualsPersistedContent(testRepository); + assertTrue(testRepository.findAllResourceItems().isEmpty()); + + ResourceItem toDeleteItem = resourceItem("resource1", null, ASSET); + Optional result; + try (MockedStatic mockedFiles = mockStatic(Files.class)) { + mockedFiles.when(() -> writeString(any(), any(), eq(CREATE), eq(TRUNCATE_EXISTING), eq(WRITE), eq(SYNC))).thenThrow(new IOException()); + + result = testRepository.deleteResourceItem(toDeleteItem); + } + assertTrue(result.isEmpty()); + } + + private ResourceRepositoryDescriptor loadRepository() throws IOException { + return c2Serializer.deserialize(readString(repositoryFile), ResourceRepositoryDescriptor.class).orElse(null); + } + + private void saveRepository(ResourceRepositoryDescriptor resourceRepositoryDescriptor) throws IOException { + writeString(repositoryFile, c2Serializer.serialize(resourceRepositoryDescriptor).orElse(EMPTY), CREATE, TRUNCATE_EXISTING, WRITE, SYNC); + } + + private FileResourceRepository createTestRepository() { + return new FileResourceRepository(assetDirectory, extensionDirectory, configDirectoryPath, c2Serializer); + } + + private ResourcesGlobalHash resourcesGlobalHash(String digest, String hashType) { + ResourcesGlobalHash resourcesGlobalHash = new ResourcesGlobalHash(); + resourcesGlobalHash.setDigest(digest); + resourcesGlobalHash.setHashType(hashType); + return resourcesGlobalHash; + } + + private ResourceItem resourceItem(String id, String path, ResourceType resourceType) { + return resourceItem(id, path, resourceType, null, null); + } + + private ResourceItem resourceItem(String id, String path, ResourceType resourceType, String hashType, String digest) { + ResourceItem resourceItem = new ResourceItem(); + resourceItem.setResourceId(id); + resourceItem.setResourceName(id); + resourceItem.setResourcePath(path); + resourceItem.setResourceType(resourceType); + resourceItem.setHashType(hashType); + resourceItem.setDigest(digest); + return resourceItem; + } + + private void assertRepositoryInMemoryContentEqualsPersistedContent(FileResourceRepository testRepository) throws IOException { + assertTrue(exists(repositoryFile)); + ResourceRepositoryDescriptor loadedRepositoryDescriptor = loadRepository(); + assertNotNull(loadedRepositoryDescriptor); + assertEquals(loadedRepositoryDescriptor.resourcesGlobalHash(), testRepository.findResourcesGlobalHash()); + assertIterableEquals(loadedRepositoryDescriptor.resourceItems(), testRepository.findAllResourceItems()); + } + + private Path resourcePath(ResourceItem item) { + Path resourcePath = switch (item.getResourceType()) { + case ASSET -> assetRepositoryDirectory.resolve(isBlank(item.getResourcePath()) ? item.getResourceName() : item.getResourcePath() + "/" + item.getResourceName()); + case EXTENSION -> extensionDirectory.resolve(item.getResourceName()); + }; + return resourcePath.toAbsolutePath(); + } + + private Path createResourceBinary(ResourceItem resourceItem, String content) throws IOException { + Path resourcePath = resourcePath(resourceItem); + createFile(resourcePath, content); + return resourcePath; + } + + private Path createTempBinary() throws IOException { + Path resourcePath = testBaseDirectory.resolve(randomUUID().toString()); + createFile(resourcePath, RESOURCE_BINARY_CONTENT); + return resourcePath; + } + + private void createFile(Path path, String content) throws IOException { + createDirectories(path.getParent()); + writeString(path, content); + } +} diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf index 5570bbf4b6..680465a5d2 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf @@ -36,9 +36,6 @@ java.arg.1=-Dorg.apache.jasper.compiler.disablejsr199=true java.arg.2=-Xms${minifi.jvm.heap.mb}m java.arg.3=-Xmx${minifi.jvm.heap.mb}m -# Enable Remote Debugging -#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000 - # allowRestrictedHeaders is required for Cluster/Node communications to work properly java.arg.5=-Dsun.net.http.allowRestrictedHeaders=true java.arg.6=-Djava.protocol.handler.pkgs=sun.net.www.protocol @@ -91,13 +88,6 @@ nifi.minifi.sensitive.props.algorithm= #c2.agent.identifier= # If set to false heartbeat won't contain the manifest. Defaults to true. #c2.full.heartbeat=false -## define protocol parameters -# DEPRECATED: c2.rest.url and c2.rest.url.ack are deprecated in favor of c2.rest.path.* properties and are target to be removed in future release -# The absolute url of the C2 server's heartbeat endpoint, eg.: http://localhost/c2-server/api/heartbeat -#c2.rest.url= -# The absolute url of the C2 server's acknowledge endpoint, eg.: http://localhost/c2-server/api/acknowledge -#c2.rest.url.ack= -# C2 Rest Path Properties # The base path of the C2 server's REST API, eg.: http://localhost/c2-server/api #c2.rest.path.base= # Relative url of the C2 server's heartbeat endpoint, eg.: /heartbeat