NIFI-13242 MiNiFi Sync Resource C2 command

Signed-off-by: Ferenc Erdei <erdei.ferenc90@gmail.com>
This closes #8898.
This commit is contained in:
Ferenc Kis 2024-05-15 14:19:46 +02:00 committed by Ferenc Erdei
parent 7deac6afac
commit 3aa4ff9d56
No known key found for this signature in database
GPG Key ID: 023D856C60E92F96
37 changed files with 2160 additions and 197 deletions

View File

@ -17,7 +17,10 @@
package org.apache.nifi.c2.client.api;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Optional;
import java.util.function.Function;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
@ -58,6 +61,16 @@ public interface C2Client {
*/
Optional<byte[]> retrieveUpdateAssetContent(String callbackUrl);
/**
* Retrieves a resource from the C2 server. The resource is not materialized into a byte[],
* instead a consumer is provided to stream the data to a specified location
*
* @param callbackUrl url where the resource should be downloaded from
* @param resourceConsumer consumer to handle the incoming data as a stream
* @return the path of the downloaded resource. Will be empty if no content can be downloaded or an error occurred
*/
Optional<Path> retrieveResourceItem(String callbackUrl, Function<InputStream, Optional<Path>> resourceConsumer);
/**
* Uploads a binary bundle to C2 server
*

View File

@ -14,15 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.serializer;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Optional;
import com.fasterxml.jackson.databind.module.SimpleModule;
import java.util.Optional;
import org.apache.nifi.c2.protocol.api.OperandType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -48,7 +52,7 @@ public class C2JacksonSerializer implements C2Serializer {
public <T> Optional<String> serialize(T object) {
if (object == null) {
logger.trace("C2 Object was null. Nothing to serialize. Returning empty.");
return Optional.empty();
return empty();
}
String contentString = null;
@ -58,14 +62,14 @@ public class C2JacksonSerializer implements C2Serializer {
logger.error("Object serialization to JSON failed", e);
}
return Optional.ofNullable(contentString);
return ofNullable(contentString);
}
@Override
public <T> Optional<T> deserialize(String content, Class<T> valueType) {
if (content == null) {
logger.trace("Content for deserialization was null. Returning empty.");
return Optional.empty();
logger.trace("Content for deserialization was null. Returning empty");
return empty();
}
T responseObject = null;
@ -75,6 +79,21 @@ public class C2JacksonSerializer implements C2Serializer {
logger.error("Object deserialization from JSON failed", e);
}
return Optional.ofNullable(responseObject);
return ofNullable(responseObject);
}
@Override
public <T> Optional<T> convert(Object content, TypeReference<T> valueType) {
if (content == null) {
logger.trace("Content for conversion was null. Returning empty");
return empty();
}
try {
return ofNullable(objectMapper.convertValue(content, valueType));
} catch (IllegalArgumentException e) {
logger.error("Object conversion failed", e);
return empty();
}
}
}

View File

@ -14,8 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.serializer;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.Optional;
/**
@ -33,12 +35,22 @@ public interface C2Serializer {
<T> Optional<String> serialize(T content);
/**
* Helper to deserialise an object
* Helper to deserialize an object
*
* @param content the string representation of the object to be deserialsed
* @param content the string representation of the object to be deserialized
* @param valueType the class of the target object
* @param <T> the type of the target object
* @return the deserialised object if successful empty otherwise
*/
<T> Optional<T> deserialize(String content, Class<T> valueType);
/**
* Helper to convert a JSON object into a specific type
*
* @param content the string representation of the object to be deserialized
* @param valueType the class of the target object
* @param <T> the type of the target object
* @return the converted object if the conversion was successful empty otherwise
*/
<T> Optional<T> convert(Object content, TypeReference<T> valueType);
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.util;
public abstract class Preconditions {
private Preconditions() {
}
public static void requires(boolean criterion, String exceptionMessage) {
if (!criterion) {
throw new IllegalArgumentException(exceptionMessage);
}
}
}

View File

@ -17,12 +17,18 @@
package org.apache.nifi.c2.client.http;
import static java.lang.String.format;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
import static okhttp3.MultipartBody.FORM;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
@ -95,6 +101,27 @@ public class C2HttpClient implements C2Client {
return retrieveContent(callbackUrl, Map.of());
}
@Override
public Optional<Path> retrieveResourceItem(String callbackUrl, Function<InputStream, Optional<Path>> resourceConsumer) {
Request request = new Request.Builder()
.get()
.url(callbackUrl)
.build();
try (Response response = httpClientReference.get().newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new C2ServerException("Resource content retrieval failed with HTTP return code " + response.code());
}
return ofNullable(response.body())
.map(ResponseBody::byteStream)
.map(resourceConsumer::apply)
.orElseThrow(() -> new C2ServerException("Resource content retrieval failed with empty body"));
} catch (Exception e) {
logger.warn("Resource item retrieval failed", e);
return empty();
}
}
@Override
public Optional<String> uploadBundle(String callbackUrl, byte[] bundle) {
Request request = new Request.Builder()
@ -115,7 +142,7 @@ public class C2HttpClient implements C2Client {
logger.error("Could not upload bundle to C2 server {}", callbackUrl, e);
return Optional.of("Could not upload bundle to C2 server");
}
return Optional.empty();
return empty();
}
@Override
@ -124,7 +151,7 @@ public class C2HttpClient implements C2Client {
}
private Optional<C2HeartbeatResponse> sendHeartbeat(String heartbeat) {
Optional<C2HeartbeatResponse> c2HeartbeatResponse = Optional.empty();
Optional<C2HeartbeatResponse> c2HeartbeatResponse = empty();
Request request = new Request.Builder()
.post(RequestBody.create(heartbeat, MEDIA_TYPE_APPLICATION_JSON))
.url(c2UrlProvider.getHeartbeatUrl())
@ -151,7 +178,7 @@ public class C2HttpClient implements C2Client {
logger.error("HTTP Request failed", e);
}
return Optional.ofNullable(responseBody);
return ofNullable(responseBody);
}
private void sendAck(Request request) {
@ -165,7 +192,7 @@ public class C2HttpClient implements C2Client {
}
private Optional<byte[]> retrieveContent(String callbackUrl, Map<String, String> httpHeaders) {
Optional<byte[]> content = Optional.empty();
Optional<byte[]> content = empty();
Request.Builder requestBuilder = new Request.Builder()
.get()
@ -174,10 +201,10 @@ public class C2HttpClient implements C2Client {
Request request = requestBuilder.build();
try (Response response = httpClientReference.get().newCall(request).execute()) {
Optional<ResponseBody> body = Optional.ofNullable(response.body());
Optional<ResponseBody> body = ofNullable(response.body());
if (!response.isSuccessful()) {
StringBuilder messageBuilder = new StringBuilder(String.format("Update content retrieval failed: HTTP %d", response.code()));
StringBuilder messageBuilder = new StringBuilder(format("Update content retrieval failed: HTTP %d", response.code()));
body.map(Object::toString).ifPresent(messageBuilder::append);
throw new C2ServerException(messageBuilder.toString());
}

View File

@ -86,7 +86,7 @@ public class C2HttpClientTest {
mockWebServer.enqueue(new MockResponse().setBody("responseBody"));
when(serializer.serialize(any(C2Heartbeat.class))).thenReturn(Optional.of("Heartbeat"));
when(serializer.deserialize(any(), any())).thenReturn(Optional.of(hbResponse));
when(serializer.deserialize(any(), any(Class.class))).thenReturn(Optional.of(hbResponse));
C2HttpClient c2HttpClient = C2HttpClient.create(c2ClientConfig, serializer);
Optional<C2HeartbeatResponse> response = c2HttpClient.publishHeartbeat(new C2Heartbeat());

View File

@ -38,6 +38,7 @@ import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.nifi.c2.client.C2ClientConfig;
import org.apache.nifi.c2.client.PersistentUuidGenerator;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
@ -46,12 +47,14 @@ import org.apache.nifi.c2.protocol.api.AgentManifest;
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.AgentResourceConsumption;
import org.apache.nifi.c2.protocol.api.AgentStatus;
import org.apache.nifi.c2.protocol.api.ResourceInfo;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.DeviceInfo;
import org.apache.nifi.c2.protocol.api.FlowInfo;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.protocol.api.NetworkInfo;
import org.apache.nifi.c2.protocol.api.SupportedOperation;
import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
import org.apache.nifi.c2.protocol.api.SystemInfo;
import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
import org.slf4j.Logger;
@ -67,15 +70,18 @@ public class C2HeartbeatFactory {
private final C2ClientConfig clientConfig;
private final FlowIdHolder flowIdHolder;
private final ManifestHashProvider manifestHashProvider;
private final Supplier<ResourcesGlobalHash> resourcesGlobalHashSupplier;
private String agentId;
private String deviceId;
private File confDirectory;
public C2HeartbeatFactory(C2ClientConfig clientConfig, FlowIdHolder flowIdHolder, ManifestHashProvider manifestHashProvider) {
public C2HeartbeatFactory(C2ClientConfig clientConfig, FlowIdHolder flowIdHolder, ManifestHashProvider manifestHashProvider,
Supplier<ResourcesGlobalHash> resourcesGlobalHashSupplier) {
this.clientConfig = clientConfig;
this.flowIdHolder = flowIdHolder;
this.manifestHashProvider = manifestHashProvider;
this.resourcesGlobalHashSupplier = resourcesGlobalHashSupplier;
}
public C2Heartbeat create(RuntimeInfoWrapper runtimeInfoWrapper) {
@ -86,6 +92,10 @@ public class C2HeartbeatFactory {
heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus()));
heartbeat.setCreated(System.currentTimeMillis());
ResourceInfo resourceInfo = new ResourceInfo();
resourceInfo.setHash(resourcesGlobalHashSupplier.get().getDigest());
heartbeat.setResourceInfo(resourceInfo);
return heartbeat;
}

View File

@ -14,13 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.service.operation;
import static java.util.Optional.ofNullable;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.Map;
import java.util.Optional;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.apache.nifi.c2.protocol.api.OperandType;
import org.apache.nifi.c2.protocol.api.OperationType;
import org.apache.nifi.c2.serializer.C2Serializer;
/**
* Handler interface for the different operation types
@ -50,6 +57,7 @@ public interface C2OperationHandler {
/**
* Determines if the given operation requires to restart the MiNiFi process
*
* @return true if it requires restart, false otherwise
*/
default boolean requiresRestart() {
@ -63,4 +71,60 @@ public interface C2OperationHandler {
* @return the result of the operation handling
*/
C2OperationAck handle(C2Operation operation);
/**
* Commonly used logic for creating an C2OperationState object
*
* @param operationState the state of the operation
* @param details additional status info to detail the state
* @return the created state
*/
default C2OperationState operationState(C2OperationState.OperationState operationState, String details) {
C2OperationState state = new C2OperationState();
state.setState(operationState);
state.setDetails(details);
return state;
}
/**
* Commonly used logic for creating an C2OperationAck object
*
* @param operationId the identifier of the operation
* @param operationState the state of the operation
* @return the created operation ack object
*/
default C2OperationAck operationAck(String operationId, C2OperationState operationState) {
C2OperationAck operationAck = new C2OperationAck();
operationAck.setOperationState(operationState);
operationAck.setOperationId(operationId);
return operationAck;
}
/**
* Commonly used logic for retrieving a string value from the operation arguments' map
*
* @param operation the operation with arguments
* @param argument the name of the argument to retrieve
* @return the optional retrieved argument value
*/
default Optional<String> getOperationArg(C2Operation operation, String argument) {
return ofNullable(operation.getArgs())
.map(args -> args.get(argument))
.map(arg -> arg instanceof String s ? s : null);
}
/**
* Commonly used logic for retrieving a JSON object from the operation arguments' map and converting it to a model class
*
* @param operation the operation with arguments
* @param argument the name of the argument to retrieve
* @param type the target type to serialize the object to
* @param serializer the serializer used to converting to the target class
* @return the optional retrieved and converted argument value
*/
default <T> Optional<T> getOperationArg(C2Operation operation, String argument, TypeReference<T> type, C2Serializer serializer) {
return ofNullable(operation.getArgs())
.map(args -> args.get(argument))
.flatMap(arg -> serializer.convert(arg, type));
}
}

View File

@ -14,14 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.service.operation;
import static java.util.Optional.ofNullable;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
import static org.apache.nifi.c2.protocol.api.OperandType.MANIFEST;
import static org.apache.nifi.c2.protocol.api.OperationType.DESCRIBE;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.nifi.c2.client.service.C2HeartbeatFactory;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
@ -29,7 +31,6 @@ import org.apache.nifi.c2.protocol.api.AgentInfo;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.apache.nifi.c2.protocol.api.OperandType;
import org.apache.nifi.c2.protocol.api.OperationType;
@ -58,25 +59,24 @@ public class DescribeManifestOperationHandler implements C2OperationHandler {
@Override
public C2OperationAck handle(C2Operation operation) {
String opIdentifier = Optional.ofNullable(operation.getIdentifier())
.orElse(EMPTY);
C2OperationAck operationAck = new C2OperationAck();
C2OperationState state = new C2OperationState();
operationAck.setOperationState(state);
operationAck.setOperationId(opIdentifier);
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
RuntimeInfoWrapper runtimeInfoWrapper = runtimeInfoSupplier.get();
C2Heartbeat heartbeat = heartbeatFactory.create(runtimeInfoWrapper);
C2OperationAck c2OperationAck = operationAck(operationId, operationState(FULLY_APPLIED, EMPTY));
c2OperationAck.setAgentInfo(agentInfo(heartbeat, runtimeInfoWrapper));
c2OperationAck.setDeviceInfo(heartbeat.getDeviceInfo());
c2OperationAck.setFlowInfo(heartbeat.getFlowInfo());
c2OperationAck.setResourceInfo(heartbeat.getResourceInfo());
return c2OperationAck;
}
private AgentInfo agentInfo(C2Heartbeat heartbeat, RuntimeInfoWrapper runtimeInfoWrapper) {
AgentInfo agentInfo = heartbeat.getAgentInfo();
agentInfo.setAgentManifest(runtimeInfoWrapper.getManifest());
operationAck.setAgentInfo(agentInfo);
operationAck.setDeviceInfo(heartbeat.getDeviceInfo());
operationAck.setFlowInfo(heartbeat.getFlowInfo());
state.setState(C2OperationState.OperationState.FULLY_APPLIED);
return operationAck;
return agentInfo;
}
@Override

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.service.operation;
import static java.util.Optional.ofNullable;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
import static org.apache.nifi.c2.protocol.api.OperandType.RESOURCE;
import static org.apache.nifi.c2.protocol.api.OperationType.SYNC;
import static org.apache.nifi.c2.util.Preconditions.requires;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.apache.nifi.c2.protocol.api.OperandType;
import org.apache.nifi.c2.protocol.api.OperationType;
import org.apache.nifi.c2.protocol.api.ResourceItem;
import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
import org.apache.nifi.c2.serializer.C2Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SyncResourceOperationHandler implements C2OperationHandler {
static final String GLOBAL_HASH_FIELD = "globalHash";
static final String RESOURCE_LIST_FIELD = "resourceList";
private static final Logger LOG = LoggerFactory.getLogger(SyncResourceOperationHandler.class);
private final C2Client c2Client;
private final OperandPropertiesProvider operandPropertiesProvider;
private final SyncResourceStrategy syncResourceStrategy;
private final C2Serializer c2Serializer;
public SyncResourceOperationHandler(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider, SyncResourceStrategy syncResourceStrategy,
C2Serializer c2Serializer) {
this.c2Client = c2Client;
this.operandPropertiesProvider = operandPropertiesProvider;
this.syncResourceStrategy = syncResourceStrategy;
this.c2Serializer = c2Serializer;
}
public static SyncResourceOperationHandler create(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider, SyncResourceStrategy syncResourceStrategy,
C2Serializer c2Serializer) {
requires(c2Client != null, "C2Client should not be null");
requires(operandPropertiesProvider != null, "OperandPropertiesProvider should not be not null");
requires(syncResourceStrategy != null, "Sync resource strategy should not be null");
requires(c2Serializer != null, "C2 serializer should not be null");
return new SyncResourceOperationHandler(c2Client, operandPropertiesProvider, syncResourceStrategy, c2Serializer);
}
@Override
public OperationType getOperationType() {
return SYNC;
}
@Override
public OperandType getOperandType() {
return RESOURCE;
}
@Override
public Map<String, Object> getProperties() {
return operandPropertiesProvider.getProperties();
}
@Override
public C2OperationAck handle(C2Operation operation) {
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
Optional<ResourcesGlobalHash> resourcesGlobalHash = getOperationArg(operation, GLOBAL_HASH_FIELD, new TypeReference<>() {
}, c2Serializer);
if (resourcesGlobalHash.isEmpty()) {
LOG.error("Resources global hash could not be constructed from C2 request");
return operationAck(operationId, operationState(NOT_APPLIED, "Resources global hash element was not found"));
}
Optional<List<ResourceItem>> resourceItems = getOperationArg(operation, RESOURCE_LIST_FIELD, new TypeReference<>() {
}, c2Serializer);
if (resourceItems.isEmpty()) {
LOG.error("Resource item list could not be constructed from C2 request");
return operationAck(operationId, operationState(NOT_APPLIED, "Resource item list element was not found"));
}
OperationState operationState = syncResourceStrategy.synchronizeResourceRepository(resourcesGlobalHash.get(), resourceItems.get(), c2Client::retrieveResourceItem,
relativeUrl -> c2Client.getCallbackUrl(null, relativeUrl));
C2OperationState resultState = operationState(
operationState,
switch (operationState) {
case NOT_APPLIED -> "No resource items were retrieved, please check the log for errors";
case PARTIALLY_APPLIED -> "Resource repository is partially synced, retrieving some items failed. Pleas check log for errors";
case FULLY_APPLIED -> "Agent Resource repository is in sync with the C2 server";
default -> "Unexpected status, please check the log for errors";
}
);
return operationAck(operationId, resultState);
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.service.operation;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.apache.nifi.c2.protocol.api.ResourceItem;
import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
public interface SyncResourceStrategy {
OperationState synchronizeResourceRepository(ResourcesGlobalHash resourcesGlobalHash, List<ResourceItem> c2ServerItems,
BiFunction<String, Function<InputStream, Optional<Path>>, Optional<Path>> resourceDownloadFunction,
Function<String, Optional<String>> urlEnrichFunction);
}

View File

@ -21,7 +21,6 @@ import static java.nio.file.Files.copy;
import static java.nio.file.Files.createTempDirectory;
import static java.nio.file.Files.lines;
import static java.nio.file.Files.walk;
import static java.util.Collections.emptyMap;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
import static org.apache.commons.io.IOUtils.closeQuietly;
@ -30,6 +29,7 @@ import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FU
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER;
import static org.apache.nifi.c2.util.Preconditions.requires;
import java.io.ByteArrayOutputStream;
import java.io.File;
@ -55,7 +55,6 @@ import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.apache.nifi.c2.protocol.api.OperandType;
import org.apache.nifi.c2.protocol.api.OperationType;
import org.slf4j.Logger;
@ -88,18 +87,10 @@ public class TransferDebugOperationHandler implements C2OperationHandler {
public static TransferDebugOperationHandler create(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider,
List<Path> bundleFilePaths, Predicate<String> contentFilter) {
if (c2Client == null) {
throw new IllegalArgumentException("C2Client should not be null");
}
if (operandPropertiesProvider == null) {
throw new IllegalArgumentException("OperandPropertiesProvider should not be not null");
}
if (bundleFilePaths == null || bundleFilePaths.isEmpty()) {
throw new IllegalArgumentException("bundleFilePaths should not be not null or empty");
}
if (contentFilter == null) {
throw new IllegalArgumentException("Content filter should not be null");
}
requires(c2Client != null, "C2Client should not be null");
requires(operandPropertiesProvider != null, "OperandPropertiesProvider should not be not null");
requires(bundleFilePaths != null && !bundleFilePaths.isEmpty(), "BundleFilePaths should not be not null or empty");
requires(contentFilter != null, "Content filter should not be null");
return new TransferDebugOperationHandler(c2Client, operandPropertiesProvider, bundleFilePaths, contentFilter);
}
@ -120,17 +111,18 @@ public class TransferDebugOperationHandler implements C2OperationHandler {
@Override
public C2OperationAck handle(C2Operation operation) {
Map<String, String> arguments = ofNullable(operation.getArgs()).orElse(emptyMap());
Optional<String> callbackUrl = c2Client.getCallbackUrl(arguments.get(TARGET_ARG), arguments.get(RELATIVE_TARGET_ARG));
if (!callbackUrl.isPresent()) {
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
Optional<String> callbackUrl = c2Client.getCallbackUrl(getOperationArg(operation, TARGET_ARG).orElse(EMPTY), getOperationArg(operation, RELATIVE_TARGET_ARG).orElse(EMPTY));
if (callbackUrl.isEmpty()) {
LOG.error("Callback URL could not be constructed from C2 request and current configuration");
return operationAck(operation, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND));
return operationAck(operationId, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND));
}
List<Path> preparedFiles = null;
C2OperationState operationState;
try {
preparedFiles = prepareFiles(operation.getIdentifier(), bundleFilePaths);
preparedFiles = prepareFiles(operationId, bundleFilePaths);
operationState = createDebugBundle(preparedFiles)
.map(bundle -> c2Client.uploadBundle(callbackUrl.get(), bundle)
.map(errorMessage -> operationState(NOT_APPLIED, errorMessage))
@ -144,21 +136,7 @@ public class TransferDebugOperationHandler implements C2OperationHandler {
}
LOG.debug("Returning operation ack for operation {} with state {} and details {}", operation.getIdentifier(), operationState.getState(), operationState.getDetails());
return operationAck(operation, operationState);
}
private C2OperationAck operationAck(C2Operation operation, C2OperationState state) {
C2OperationAck operationAck = new C2OperationAck();
operationAck.setOperationId(ofNullable(operation.getIdentifier()).orElse(EMPTY));
operationAck.setOperationState(state);
return operationAck;
}
private C2OperationState operationState(OperationState operationState, String details) {
C2OperationState state = new C2OperationState();
state.setState(operationState);
state.setDetails(details);
return state;
return operationAck(operationId, operationState);
}
private List<Path> prepareFiles(String operationId, List<Path> bundleFilePaths) throws IOException {

View File

@ -17,8 +17,7 @@
package org.apache.nifi.c2.client.service.operation;
import static java.lang.Boolean.parseBoolean;
import static java.util.Collections.emptyMap;
import static java.lang.Boolean.FALSE;
import static java.util.Optional.ofNullable;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
@ -26,6 +25,7 @@ import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
import static org.apache.nifi.c2.protocol.api.OperandType.ASSET;
import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
import static org.apache.nifi.c2.util.Preconditions.requires;
import java.util.Map;
import java.util.Optional;
@ -35,7 +35,6 @@ import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.apache.nifi.c2.protocol.api.OperandType;
import org.apache.nifi.c2.protocol.api.OperationType;
import org.slf4j.Logger;
@ -72,19 +71,10 @@ public class UpdateAssetOperationHandler implements C2OperationHandler {
public static UpdateAssetOperationHandler create(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider,
BiPredicate<String, Boolean> assetUpdatePrecondition, BiFunction<String, byte[], Boolean> assetPersistFunction) {
if (c2Client == null) {
throw new IllegalArgumentException("C2Client should not be null");
}
if (operandPropertiesProvider == null) {
throw new IllegalArgumentException("OperandPropertiesProvider should not be not null");
}
if (assetUpdatePrecondition == null) {
throw new IllegalArgumentException("Asset update precondition should not be null");
}
if (assetPersistFunction == null) {
throw new IllegalArgumentException("Asset persist function should not be null");
}
requires(c2Client != null, "C2Client should not be null");
requires(operandPropertiesProvider != null, "OperandPropertiesProvider should not be not null");
requires(assetUpdatePrecondition != null, "Asset update precondition should not be null");
requires(assetPersistFunction != null, "Asset persist function should not be null");
return new UpdateAssetOperationHandler(c2Client, operandPropertiesProvider, assetUpdatePrecondition, assetPersistFunction);
}
@ -107,24 +97,25 @@ public class UpdateAssetOperationHandler implements C2OperationHandler {
public C2OperationAck handle(C2Operation operation) {
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
Optional<String> callbackUrl = c2Client.getCallbackUrl(getOperationArg(operation, ASSET_URL_KEY), getOperationArg(operation, ASSET_RELATIVE_URL_KEY));
if (!callbackUrl.isPresent()) {
Optional<String> callbackUrl =
c2Client.getCallbackUrl(getOperationArg(operation, ASSET_URL_KEY).orElse(EMPTY), getOperationArg(operation, ASSET_RELATIVE_URL_KEY).orElse(EMPTY));
if (callbackUrl.isEmpty()) {
LOG.error("Callback URL could not be constructed from C2 request and current configuration");
return operationAck(operationId, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND));
}
String assetFileName = getOperationArg(operation, ASSET_FILE_KEY);
if (assetFileName == null) {
Optional<String> assetFileName = getOperationArg(operation, ASSET_FILE_KEY);
if (assetFileName.isEmpty()) {
LOG.error("Asset file name with key={} was not found in C2 request. C2 request arguments={}", ASSET_FILE_KEY, operation.getArgs());
return operationAck(operationId, operationState(NOT_APPLIED, ASSET_FILE_NAME_NOT_FOUND));
}
boolean forceDownload = parseBoolean(getOperationArg(operation, ASSET_FORCE_DOWNLOAD_KEY));
boolean forceDownload = getOperationArg(operation, ASSET_FORCE_DOWNLOAD_KEY).map(Boolean::parseBoolean).orElse(FALSE);
LOG.info("Initiating asset update from url {} with name {}, force update is {}", callbackUrl, assetFileName, forceDownload);
C2OperationState operationState = assetUpdatePrecondition.test(assetFileName, forceDownload)
C2OperationState operationState = assetUpdatePrecondition.test(assetFileName.get(), forceDownload)
? c2Client.retrieveUpdateAssetContent(callbackUrl.get())
.map(content -> assetPersistFunction.apply(assetFileName, content)
.map(content -> assetPersistFunction.apply(assetFileName.get(), content)
? operationState(FULLY_APPLIED, SUCCESSFULLY_UPDATE_ASSET)
: operationState(NOT_APPLIED, FAILED_TO_PERSIST_ASSET_TO_DISK))
.orElseGet(() -> operationState(NOT_APPLIED, UPDATE_ASSET_RETRIEVAL_RESULTED_IN_EMPTY_CONTENT))
@ -132,22 +123,4 @@ public class UpdateAssetOperationHandler implements C2OperationHandler {
return operationAck(operationId, operationState);
}
private String getOperationArg(C2Operation operation, String argument) {
return ofNullable(operation.getArgs()).orElse(emptyMap()).get(argument);
}
private C2OperationState operationState(OperationState operationState, String details) {
C2OperationState state = new C2OperationState();
state.setState(operationState);
state.setDetails(details);
return state;
}
private C2OperationAck operationAck(String operationId, C2OperationState operationState) {
C2OperationAck operationAck = new C2OperationAck();
operationAck.setOperationState(operationState);
operationAck.setOperationId(operationId);
return operationAck;
}
}

View File

@ -17,7 +17,7 @@
package org.apache.nifi.c2.client.service.operation;
import static java.util.Collections.emptyMap;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
@ -88,32 +88,31 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler {
@Override
public C2OperationAck handle(C2Operation operation) {
String operationId = Optional.ofNullable(operation.getIdentifier()).orElse(EMPTY);
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
Map<String, String> arguments = ofNullable(operation.getArgs()).orElse(emptyMap());
String absoluteFlowUrl = ofNullable(arguments.get(FLOW_URL_KEY)).orElse(arguments.get(LOCATION));
Optional<String> callbackUrl = client.getCallbackUrl(absoluteFlowUrl, arguments.get(FLOW_RELATIVE_URL_KEY));
if (!callbackUrl.isPresent()) {
String absoluteFlowUrl = getOperationArg(operation, FLOW_URL_KEY).orElse(getOperationArg(operation, LOCATION).orElse(EMPTY));
Optional<String> callbackUrl = client.getCallbackUrl(absoluteFlowUrl, getOperationArg(operation, FLOW_RELATIVE_URL_KEY).orElse(EMPTY));
if (callbackUrl.isEmpty()) {
logger.error("Callback URL could not be constructed from C2 request and current configuration");
return operationAck(operationId, operationState(NOT_APPLIED, "Could not get callback url from operation and current configuration"));
}
String flowId = getFlowId(operation.getArgs(), callbackUrl.get());
if (flowId == null) {
Optional<String> flowId = getFlowId(operation, callbackUrl.get());
if (flowId.isEmpty()) {
logger.error("FlowId is missing, no update will be performed");
return operationAck(operationId, operationState(NOT_APPLIED, "Could not get flowId from the operation"));
}
if (flowIdHolder.getFlowId() != null && flowIdHolder.getFlowId().equals(flowId)) {
if (flowIdHolder.getFlowId() != null && flowIdHolder.getFlowId().equals(flowId.get())) {
logger.info("Flow is current, no update is necessary");
return operationAck(operationId, operationState(NO_OPERATION, "Flow is current, no update is necessary"));
}
logger.info("Will perform flow update from {} for operation #{}. Previous flow id was {}, replacing with new id {}",
callbackUrl, operationId, ofNullable(flowIdHolder.getFlowId()).orElse("not set"), flowId);
callbackUrl, operationId, ofNullable(flowIdHolder.getFlowId()).orElse("not set"), flowId.get());
C2OperationState state = updateFlow(operationId, callbackUrl.get());
if (state.getState() == FULLY_APPLIED) {
flowIdHolder.setFlowId(flowId);
flowIdHolder.setFlowId(flowId.get());
}
return operationAck(operationId, state);
}
@ -121,7 +120,7 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler {
private C2OperationState updateFlow(String opIdentifier, String callbackUrl) {
Optional<byte[]> updateContent = client.retrieveUpdateConfigurationContent(callbackUrl);
if (!updateContent.isPresent()) {
if (updateContent.isEmpty()) {
logger.error("Update content retrieval resulted in empty content so flow update was omitted for operation #{}.", opIdentifier);
return operationState(NOT_APPLIED, "Update content retrieval resulted in empty content");
}
@ -135,37 +134,21 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler {
return operationState(FULLY_APPLIED, "Update configuration applied successfully");
}
private String getFlowId(Map<String, String> args, String callbackUrl) {
return Optional.ofNullable(args)
.map(map -> map.get(FLOW_ID))
.orElseGet(() -> parseFlowId(callbackUrl));
private Optional<String> getFlowId(C2Operation operation, String callbackUrl) {
return getOperationArg(operation, FLOW_ID).or(() -> parseFlowId(callbackUrl));
}
private String parseFlowId(String callbackUrl) {
private Optional<String> parseFlowId(String callbackUrl) {
try {
URI flowUri = new URI(callbackUrl);
Matcher matcher = FLOW_ID_PATTERN.matcher(flowUri.getPath());
if (matcher.matches()) {
return matcher.group(1);
return ofNullable(matcher.group(1));
}
} catch (Exception e) {
logger.error("Could not get flow id from the provided URL, flow update URL format unexpected [{}]", callbackUrl);
}
return null;
}
private C2OperationState operationState(C2OperationState.OperationState operationState, String details) {
C2OperationState state = new C2OperationState();
state.setState(operationState);
state.setDetails(details);
return state;
}
private C2OperationAck operationAck(String operationId, C2OperationState operationState) {
C2OperationAck operationAck = new C2OperationAck();
operationAck.setOperationState(operationState);
operationAck.setOperationId(operationId);
return operationAck;
return empty();
}
}

View File

@ -17,6 +17,8 @@
package org.apache.nifi.c2.client.service.operation;
import static java.util.Optional.ofNullable;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
@ -38,9 +40,9 @@ public class UpdatePropertiesOperationHandler implements C2OperationHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(UpdatePropertiesOperationHandler.class);
private final OperandPropertiesProvider operandPropertiesProvider;
private final Function<Map<String, String>, Boolean> persistProperties;
private final Function<Map<String, Object>, Boolean> persistProperties;
public UpdatePropertiesOperationHandler(OperandPropertiesProvider operandPropertiesProvider, Function<Map<String, String>, Boolean> persistProperties) {
public UpdatePropertiesOperationHandler(OperandPropertiesProvider operandPropertiesProvider, Function<Map<String, Object>, Boolean> persistProperties) {
this.operandPropertiesProvider = operandPropertiesProvider;
this.persistProperties = persistProperties;
}
@ -62,27 +64,24 @@ public class UpdatePropertiesOperationHandler implements C2OperationHandler {
@Override
public C2OperationAck handle(C2Operation operation) {
C2OperationAck c2OperationAck = new C2OperationAck();
c2OperationAck.setOperationId(operation.getIdentifier());
C2OperationState operationState = new C2OperationState();
c2OperationAck.setOperationState(operationState);
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
C2OperationState c2OperationState;
try {
if (persistProperties.apply(operation.getArgs())) {
operationState.setState(FULLY_APPLIED);
c2OperationState = operationState(FULLY_APPLIED, null);
} else {
LOGGER.info("Properties are already in desired state");
operationState.setState(NO_OPERATION);
c2OperationState = operationState(NO_OPERATION, null);
}
} catch (IllegalArgumentException e) {
LOGGER.error(e.getMessage());
operationState.setState(NOT_APPLIED);
operationState.setDetails(e.getMessage());
LOGGER.error("Operation not applied due to issues with the arguments: {}", e.getMessage());
c2OperationState = operationState(NOT_APPLIED, e.getMessage());
} catch (Exception e) {
LOGGER.error("Exception happened during persisting properties", e);
operationState.setState(NOT_APPLIED);
operationState.setDetails("Failed to persist properties");
LOGGER.error("Unexpected error happened during persisting properties", e);
c2OperationState = operationState(NOT_APPLIED, "Failed to persist properties");
}
return c2OperationAck;
return operationAck(operationId, c2OperationState);
}
@Override

View File

@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.nifi.c2.client.C2ClientConfig;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
import org.apache.nifi.c2.protocol.api.AgentManifest;
@ -37,6 +38,7 @@ import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.protocol.api.OperationType;
import org.apache.nifi.c2.protocol.api.SupportedOperation;
import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
import org.apache.nifi.c2.protocol.component.api.Bundle;
import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
import org.junit.jupiter.api.BeforeEach;
@ -53,6 +55,7 @@ public class C2HeartbeatFactoryTest {
private static final String AGENT_CLASS = "agentClass";
private static final String FLOW_ID = "flowId";
private static final String MANIFEST_HASH = "hash";
private static final String RESOURCE_HASH = "resourceHash";
@Mock
private C2ClientConfig clientConfig;
@ -66,6 +69,9 @@ public class C2HeartbeatFactoryTest {
@Mock
private ManifestHashProvider manifestHashProvider;
@Mock
private Supplier<ResourcesGlobalHash> resourcesGlobalHashSupplier;
@InjectMocks
private C2HeartbeatFactory c2HeartbeatFactory;
@ -82,26 +88,31 @@ public class C2HeartbeatFactoryTest {
when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
when(clientConfig.getAgentClass()).thenReturn(AGENT_CLASS);
when(runtimeInfoWrapper.getManifest()).thenReturn(createManifest());
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
C2Heartbeat heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
assertEquals(FLOW_ID, heartbeat.getFlowId());
assertEquals(AGENT_CLASS, heartbeat.getAgentClass());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}
@Test
void testCreateGeneratesAgentAndDeviceIdIfNotPresent() {
when(runtimeInfoWrapper.getManifest()).thenReturn(createManifest());
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
C2Heartbeat heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
assertNotNull(heartbeat.getAgentId());
assertNotNull(heartbeat.getDeviceId());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}
@Test
void testCreatePopulatesFromRuntimeInfoWrapperForFullHeartbeat() {
when(clientConfig.isFullHeartbeat()).thenReturn(true);
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
AgentRepositories repos = new AgentRepositories();
RuntimeManifest manifest = createManifest();
@ -112,11 +123,13 @@ public class C2HeartbeatFactoryTest {
assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories());
assertEquals(manifest, heartbeat.getAgentInfo().getAgentManifest());
assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}
@Test
void testCreatePopulatesFromRuntimeInfoWrapperForLightHeartbeat() {
when(clientConfig.isFullHeartbeat()).thenReturn(false);
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
AgentRepositories repos = new AgentRepositories();
RuntimeManifest manifest = createManifest();
@ -127,6 +140,7 @@ public class C2HeartbeatFactoryTest {
assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories());
assertNull(heartbeat.getAgentInfo().getAgentManifest());
assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}
@Test
@ -140,10 +154,12 @@ public class C2HeartbeatFactoryTest {
void testAgentManifestHashIsPopulatedInCaseOfRuntimeManifest() {
RuntimeManifest manifest = createManifest();
when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), Collections.emptySet())).thenReturn(MANIFEST_HASH);
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>()));
assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}
@Test
@ -154,10 +170,12 @@ public class C2HeartbeatFactoryTest {
Set<SupportedOperation> supportedOperations = Collections.singleton(supportedOperation);
manifest.setSupportedOperations(supportedOperations);
when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), supportedOperations)).thenReturn(MANIFEST_HASH);
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>()));
assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}
private RuntimeManifest createManifest() {
@ -167,7 +185,12 @@ public class C2HeartbeatFactoryTest {
private RuntimeManifest createManifest(Bundle... bundles) {
RuntimeManifest manifest = new RuntimeManifest();
manifest.setBundles(Arrays.asList(bundles));
return manifest;
}
private ResourcesGlobalHash createResourcesGlobalHash() {
ResourcesGlobalHash resourcesGlobalHash = new ResourcesGlobalHash();
resourcesGlobalHash.setDigest(RESOURCE_HASH);
return resourcesGlobalHash;
}
}

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.service.operation;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
import static org.apache.nifi.c2.client.service.operation.SyncResourceOperationHandler.GLOBAL_HASH_FIELD;
import static org.apache.nifi.c2.client.service.operation.SyncResourceOperationHandler.RESOURCE_LIST_FIELD;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
import static org.apache.nifi.c2.protocol.api.OperandType.RESOURCE;
import static org.apache.nifi.c2.protocol.api.OperationType.SYNC;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.apache.nifi.c2.protocol.api.ResourceItem;
import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
import org.apache.nifi.c2.serializer.C2Serializer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class SyncResourceOperationHandlerTest {
private static final String OPERATION_ID = "operationId";
@Mock
private C2Client mockC2Client;
@Mock
private OperandPropertiesProvider mockOperandPropertiesProvider;
@Mock
private SyncResourceStrategy mockSyncResourceStrategy;
@Mock
private C2Serializer mockC2Serializer;
@InjectMocks
private SyncResourceOperationHandler testHandler;
@ParameterizedTest(name = "c2Client={0} operandPropertiesProvider={1} syncResourceStrategy={2} c2Serializer={3}")
@MethodSource("invalidConstructorArguments")
public void testAttemptingCreateWithInvalidParametersWillThrowException(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider,
SyncResourceStrategy syncResourceStrategy, C2Serializer c2Serializer) {
assertThrows(IllegalArgumentException.class, () -> SyncResourceOperationHandler.create(c2Client, operandPropertiesProvider, syncResourceStrategy, c2Serializer));
}
@Test
public void testOperationAndOperandTypesAreMatching() {
assertEquals(SYNC, testHandler.getOperationType());
assertEquals(RESOURCE, testHandler.getOperandType());
}
@Test
public void testResourcesGlobalHashArgumentIsNull() {
C2Operation inputOperation = operation(null, List.of(new ResourceItem()));
C2OperationAck c2OperationAck = testHandler.handle(inputOperation);
assertEquals(OPERATION_ID, c2OperationAck.getOperationId());
assertEquals(NOT_APPLIED, c2OperationAck.getOperationState().getState());
verify(mockSyncResourceStrategy, never()).synchronizeResourceRepository(any(), anyList(), any(), any());
}
@Test
public void testResourceListArgumentIsNull() {
ResourcesGlobalHash resourcesGlobalHash = new ResourcesGlobalHash();
C2Operation inputOperation = operation(resourcesGlobalHash, null);
when(mockC2Serializer.convert(eq(resourcesGlobalHash), any())).thenReturn(ofNullable(resourcesGlobalHash));
C2OperationAck c2OperationAck = testHandler.handle(inputOperation);
assertEquals(OPERATION_ID, c2OperationAck.getOperationId());
assertEquals(NOT_APPLIED, c2OperationAck.getOperationState().getState());
verify(mockSyncResourceStrategy, never()).synchronizeResourceRepository(any(), anyList(), any(), any());
}
@Test
public void testArgumentConversionFailure() {
ResourcesGlobalHash resourcesGlobalHash = new ResourcesGlobalHash();
List<ResourceItem> resourceItems = List.of(new ResourceItem());
C2Operation inputOperation = operation(resourcesGlobalHash, resourceItems);
when(mockC2Serializer.convert(eq(resourcesGlobalHash), any())).thenReturn(empty());
C2OperationAck c2OperationAck = testHandler.handle(inputOperation);
assertEquals(OPERATION_ID, c2OperationAck.getOperationId());
assertEquals(NOT_APPLIED, c2OperationAck.getOperationState().getState());
verify(mockSyncResourceStrategy, never()).synchronizeResourceRepository(any(), anyList(), any(), any());
}
@ParameterizedTest(name = "operationState={0}")
@MethodSource("synchronizeStrategyArguments")
public void testSynchronizeResourceStrategyExecutions(OperationState operationState) {
ResourcesGlobalHash resourcesGlobalHash = new ResourcesGlobalHash();
List<ResourceItem> resourceItems = List.of(new ResourceItem());
C2Operation inputOperation = operation(resourcesGlobalHash, resourceItems);
when(mockC2Serializer.convert(eq(resourcesGlobalHash), any())).thenReturn(Optional.of(resourcesGlobalHash));
when(mockC2Serializer.convert(eq(resourceItems), any())).thenReturn(Optional.of(resourceItems));
when(mockSyncResourceStrategy.synchronizeResourceRepository(eq(resourcesGlobalHash), eq(resourceItems), any(), any())).thenReturn(operationState);
C2OperationAck c2OperationAck = testHandler.handle(inputOperation);
assertEquals(OPERATION_ID, c2OperationAck.getOperationId());
assertEquals(operationState, c2OperationAck.getOperationState().getState());
}
private static Stream<Arguments> invalidConstructorArguments() {
return Stream.of(
Arguments.of(null, null, null, null),
Arguments.of(mock(C2Client.class), null, null, null),
Arguments.of(mock(C2Client.class), mock(OperandPropertiesProvider.class), null, null),
Arguments.of(mock(C2Client.class), mock(OperandPropertiesProvider.class), mock(SyncResourceStrategy.class), null),
Arguments.of(mock(C2Client.class), mock(OperandPropertiesProvider.class), null, mock(C2Serializer.class)));
}
private static Stream<Arguments> synchronizeStrategyArguments() {
return Stream.of(OperationState.values()).map(Arguments::of);
}
private C2Operation operation(ResourcesGlobalHash resourcesGlobalHash, List<ResourceItem> resourceItems) {
C2Operation c2Operation = new C2Operation();
c2Operation.setIdentifier(OPERATION_ID);
Map<String, Object> arguments = new HashMap<>();
arguments.put(GLOBAL_HASH_FIELD, resourcesGlobalHash);
arguments.put(RESOURCE_LIST_FIELD, resourceItems);
c2Operation.setArgs(arguments);
return c2Operation;
}
}

View File

@ -211,7 +211,7 @@ public class UpdateAssetOperationHandlerTest {
C2Operation c2Operation = new C2Operation();
c2Operation.setIdentifier(OPERATION_ID);
Map<String, String> arguments = new HashMap<>();
Map<String, Object> arguments = new HashMap<>();
arguments.put(ASSET_URL_KEY, assetUrl);
arguments.put(ASSET_FILE_KEY, assetFile);
arguments.put(ASSET_FORCE_DOWNLOAD_KEY, forceDownload);

View File

@ -48,8 +48,8 @@ public class UpdateConfigurationOperationHandlerTest {
private static final String CORRECT_LOCATION = "/path/for/the/" + FLOW_ID;
private static final String INCORRECT_LOCATION = "incorrect/location";
private static final Map<String, String> CORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, CORRECT_LOCATION);
private static final Map<String, String> INCORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, INCORRECT_LOCATION);
private static final Map<String, Object> CORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, CORRECT_LOCATION);
private static final Map<String, Object> INCORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, INCORRECT_LOCATION);
@Mock
private FlowIdHolder flowIdHolder;
@ -89,7 +89,7 @@ public class UpdateConfigurationOperationHandlerTest {
C2Operation operation = new C2Operation();
operation.setIdentifier(OPERATION_ID);
Map<String, String> args = new HashMap<>();
Map<String, Object> args = new HashMap<>();
args.putAll(INCORRECT_LOCATION_MAP);
args.put(FLOW_ID, "argsFlowId");
operation.setArgs(args);

View File

@ -41,13 +41,13 @@ import org.mockito.junit.jupiter.MockitoExtension;
public class UpdatePropertiesOperationHandlerTest {
private static final String ID = "id";
private static final Map<String, String> ARGS = Collections.singletonMap("key", "value");
private static final Map<String, Object> ARGS = Collections.singletonMap("key", "value");
@Mock
private OperandPropertiesProvider operandPropertiesProvider;
@Mock
private Function<Map<String, String>, Boolean> persistProperties;
private Function<Map<String, Object>, Boolean> persistProperties;
@InjectMocks
private UpdatePropertiesOperationHandler updatePropertiesOperationHandler;

View File

@ -34,6 +34,7 @@ public class C2Heartbeat implements Serializable {
private DeviceInfo deviceInfo;
private AgentInfo agentInfo;
private FlowInfo flowInfo;
private ResourceInfo resourceInfo;
@Schema(hidden = true)
public String getIdentifier() {
@ -80,6 +81,15 @@ public class C2Heartbeat implements Serializable {
this.flowInfo = flowInfo;
}
@Schema(description = "Metadata for the resources currently deployed to the agent")
public ResourceInfo getResourceInfo() {
return resourceInfo;
}
public void setResourceInfo(ResourceInfo resourceInfo) {
this.resourceInfo = resourceInfo;
}
// Convenience getters
@Schema(hidden = true)
public String getDeviceId() {
@ -139,5 +149,4 @@ public class C2Heartbeat implements Serializable {
public int hashCode() {
return Objects.hash(identifier);
}
}

View File

@ -32,7 +32,7 @@ public class C2Operation implements Serializable {
private String identifier;
private OperationType operation;
private OperandType operand;
private Map<String, String> args;
private Map<String, Object> args;
private Set<String> dependencies;
@Schema(description = "A unique identifier for the operation", accessMode = Schema.AccessMode.READ_ONLY)
@ -79,11 +79,11 @@ public class C2Operation implements Serializable {
"The syntax and semantics of these arguments is defined per operation in" +
"the C2 protocol and possibly extended by an agent's implementation of the" +
"C2 protocol.")
public Map<String, String> getArgs() {
public Map<String, Object> getArgs() {
return args;
}
public void setArgs(Map<String, String> args) {
public void setArgs(Map<String, Object> args) {
this.args = args;
}

View File

@ -18,7 +18,6 @@
package org.apache.nifi.c2.protocol.api;
import io.swagger.v3.oas.annotations.media.Schema;
import java.io.Serializable;
import java.util.Objects;
@ -41,6 +40,8 @@ public class C2OperationAck implements Serializable {
@Schema(description = "Optionally, an ack can include flow info that is relevant to the operation being acknowledged")
private FlowInfo flowInfo;
@Schema(description = "Optionally, an ack can include resource info that is relevant to the operation being acknowledged")
private ResourceInfo resourceInfo;
public String getOperationId() {
return operationId;
@ -82,6 +83,14 @@ public class C2OperationAck implements Serializable {
this.flowInfo = flowInfo;
}
public ResourceInfo getResourceInfo() {
return resourceInfo;
}
public void setResourceInfo(ResourceInfo resourceInfo) {
this.resourceInfo = resourceInfo;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -91,13 +100,14 @@ public class C2OperationAck implements Serializable {
return false;
}
C2OperationAck that = (C2OperationAck) o;
return Objects.equals(operationId, that.operationId) && Objects.equals(operationState, that.operationState) && Objects.equals(deviceInfo,
that.deviceInfo) && Objects.equals(agentInfo, that.agentInfo) && Objects.equals(flowInfo, that.flowInfo);
return Objects.equals(operationId, that.operationId) && Objects.equals(operationState, that.operationState)
&& Objects.equals(deviceInfo, that.deviceInfo) && Objects.equals(agentInfo, that.agentInfo) && Objects.equals(flowInfo, that.flowInfo)
&& Objects.equals(resourceInfo, that.resourceInfo);
}
@Override
public int hashCode() {
return Objects.hash(operationId, operationState, deviceInfo, agentInfo, flowInfo);
return Objects.hash(operationId, operationState, deviceInfo, agentInfo, flowInfo, resourceInfo);
}
@Override
@ -108,6 +118,7 @@ public class C2OperationAck implements Serializable {
", deviceInfo=" + deviceInfo +
", agentInfo=" + agentInfo +
", flowInfo=" + flowInfo +
", resourceInfo=" + resourceInfo +
'}';
}
}

View File

@ -28,7 +28,8 @@ public enum OperandType {
MANIFEST,
REPOSITORY,
PROPERTIES,
ASSET;
ASSET,
RESOURCE;
public static Optional<OperandType> fromString(String value) {
return Arrays.stream(values())

View File

@ -23,6 +23,7 @@ import static org.apache.nifi.c2.protocol.api.OperandType.CONNECTION;
import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
import static org.apache.nifi.c2.protocol.api.OperandType.MANIFEST;
import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
import static org.apache.nifi.c2.protocol.api.OperandType.RESOURCE;
import java.util.Arrays;
import java.util.Set;
@ -44,6 +45,7 @@ public enum OperationType {
PAUSE,
REPLICATE,
SUBSCRIBE,
SYNC(RESOURCE),
TRANSFER(DEBUG);
private final Set<OperandType> supportedOperands;

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.protocol.api;
import io.swagger.v3.oas.annotations.media.Schema;
import java.io.Serializable;
import java.util.Objects;
public class ResourceInfo implements Serializable {
private static final long serialVersionUID = 7576080726533697542L;
private String hash;
@Schema(description = "A global hash calculated from all the available resources on the agent", required = true)
public String getHash() {
return hash;
}
public void setHash(String hash) {
this.hash = hash;
}
@Override
public String toString() {
return "ResourceInfo{" +
"hash='" + hash + '\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ResourceInfo resourceInfo = (ResourceInfo) o;
return Objects.equals(hash, resourceInfo.hash);
}
@Override
public int hashCode() {
return Objects.hash(hash);
}
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.protocol.api;
import io.swagger.v3.oas.annotations.media.Schema;
import java.io.Serializable;
import java.util.Objects;
public class ResourceItem implements Serializable {
private static final long serialVersionUID = 1L;
private String resourceId;
private String resourceName;
private ResourceType resourceType;
private String resourcePath;
private String digest;
private String hashType;
private String url;
@Schema(description = "The identifier of the resource to be synced")
public String getResourceId() {
return resourceId;
}
public void setResourceId(String resourceId) {
this.resourceId = resourceId;
}
@Schema(description = "The name of the asset to be synced")
public String getResourceName() {
return resourceName;
}
public void setResourceName(String resourceName) {
this.resourceName = resourceName;
}
@Schema(description = "The type of the asset to be synced")
public ResourceType getResourceType() {
return resourceType;
}
public void setResourceType(ResourceType resourceType) {
this.resourceType = resourceType;
}
@Schema(description = "The relative path of the asset on the agent")
public String getResourcePath() {
return resourcePath;
}
public void setResourcePath(String resourcePath) {
this.resourcePath = resourcePath;
}
@Schema(description = "The checksum hash value calculated for the particular asset")
public String getDigest() {
return digest;
}
public void setDigest(String digest) {
this.digest = digest;
}
@Schema(description = "The type of the hashing algorithm used to calculate the checksum")
public String getHashType() {
return hashType;
}
public void setHashType(String hashType) {
this.hashType = hashType;
}
@Schema(description = "The relative url of the asset, from where the agent can download from")
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ResourceItem that = (ResourceItem) o;
return Objects.equals(resourceId, that.resourceId) && Objects.equals(resourceName, that.resourceName) && Objects.equals(resourceType, that.resourceType)
&& Objects.equals(resourcePath, that.resourcePath) && Objects.equals(digest, that.digest) && Objects.equals(hashType, that.hashType)
&& Objects.equals(url, that.url);
}
@Override
public int hashCode() {
return Objects.hash(resourceId, resourceName, resourceType, resourcePath, digest, hashType, url);
}
@Override
public String toString() {
return "ResourceItem{" +
"resourceId='" + resourceId + '\'' +
", resourceName='" + resourceName + '\'' +
", resourceType=" + resourceType +
", resourcePath='" + resourcePath + '\'' +
", digest='" + digest + '\'' +
", hashType='" + hashType + '\'' +
", url='" + url + '\'' +
'}';
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.protocol.api;
public enum ResourceType {
ASSET,
EXTENSION
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.protocol.api;
import io.swagger.v3.oas.annotations.media.Schema;
import java.io.Serializable;
import java.util.Objects;
public class ResourcesGlobalHash implements Serializable {
private static final long serialVersionUID = 1L;
private String digest;
private String hashType;
@Schema(description = "The calculated hash digest value of asset repository")
public String getDigest() {
return digest;
}
public void setDigest(String digest) {
this.digest = digest;
}
@Schema(description = "The type of the hashing algorithm used to calculate the hash digest")
public String getHashType() {
return hashType;
}
public void setHashType(String hashType) {
this.hashType = hashType;
}
@Override
public String toString() {
return "ResourcesGlobalHash{" +
"digest='" + digest + '\'' +
", hashType='" + hashType + '\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ResourcesGlobalHash that = (ResourcesGlobalHash) o;
return Objects.equals(digest, that.digest) && Objects.equals(hashType, that.hashType);
}
@Override
public int hashCode() {
return Objects.hash(digest, hashType);
}
}

View File

@ -58,6 +58,7 @@ import static org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_ALGORITHM;
import static org.apache.nifi.util.NiFiProperties.SENSITIVE_PROPS_KEY;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@ -79,6 +80,7 @@ import org.apache.nifi.c2.client.service.operation.EmptyOperandPropertiesProvide
import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider;
import org.apache.nifi.c2.client.service.operation.OperationQueueDAO;
import org.apache.nifi.c2.client.service.operation.SupportedOperationsProvider;
import org.apache.nifi.c2.client.service.operation.SyncResourceOperationHandler;
import org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler;
import org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler;
import org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler;
@ -89,6 +91,7 @@ import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.serializer.C2JacksonSerializer;
import org.apache.nifi.c2.serializer.C2Serializer;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.encrypt.PropertyEncryptorBuilder;
@ -101,6 +104,9 @@ import org.apache.nifi.minifi.c2.command.PropertiesPersister;
import org.apache.nifi.minifi.c2.command.TransferDebugCommandHelper;
import org.apache.nifi.minifi.c2.command.UpdateAssetCommandHelper;
import org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider;
import org.apache.nifi.minifi.c2.command.syncresource.DefaultSyncResourceStrategy;
import org.apache.nifi.minifi.c2.command.syncresource.FileResourceRepository;
import org.apache.nifi.minifi.c2.command.syncresource.ResourceRepository;
import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.StandardFlowEnrichService;
@ -147,13 +153,16 @@ public class C2NifiClientService {
);
this.heartbeatPeriod = clientConfig.getHeartbeatPeriod();
this.flowController = flowController;
C2HttpClient client = C2HttpClient.create(clientConfig, new C2JacksonSerializer());
C2Serializer c2Serializer = new C2JacksonSerializer();
ResourceRepository resourceRepository =
new FileResourceRepository(Path.of(clientConfig.getC2AssetDirectory()), niFiProperties.getNarAutoLoadDirectory().toPath(),
niFiProperties.getFlowConfigurationFileDir().toPath(), c2Serializer);
C2HttpClient client = C2HttpClient.create(clientConfig, c2Serializer);
FlowIdHolder flowIdHolder = new FlowIdHolder(clientConfig.getConfDirectory());
C2HeartbeatFactory heartbeatFactory = new C2HeartbeatFactory(clientConfig, flowIdHolder, new ManifestHashProvider());
C2HeartbeatFactory heartbeatFactory = new C2HeartbeatFactory(clientConfig, flowIdHolder, new ManifestHashProvider(), resourceRepository::findResourcesGlobalHash);
String bootstrapConfigFileLocation = niFiProperties.getProperty("nifi.minifi.bootstrap.file");
C2OperationHandlerProvider c2OperationHandlerProvider = c2OperationHandlerProvider(niFiProperties, flowController, flowService, flowIdHolder,
client, heartbeatFactory, bootstrapConfigFileLocation, clientConfig.getC2AssetDirectory());
client, heartbeatFactory, bootstrapConfigFileLocation, clientConfig.getC2AssetDirectory(), c2Serializer, resourceRepository);
this.supportedOperationsProvider = new SupportedOperationsProvider(c2OperationHandlerProvider.getHandlers());
@ -206,7 +215,8 @@ public class C2NifiClientService {
private C2OperationHandlerProvider c2OperationHandlerProvider(NiFiProperties niFiProperties, FlowController flowController, FlowService flowService,
FlowIdHolder flowIdHolder, C2HttpClient client, C2HeartbeatFactory heartbeatFactory,
String bootstrapConfigFileLocation, String c2AssetDirectory) {
String bootstrapConfigFileLocation, String c2AssetDirectory, C2Serializer c2Serializer,
ResourceRepository resourceRepository) {
OperandPropertiesProvider emptyOperandPropertiesProvider = new EmptyOperandPropertiesProvider();
TransferDebugCommandHelper transferDebugCommandHelper = new TransferDebugCommandHelper(niFiProperties);
UpdateAssetCommandHelper updateAssetCommandHelper = new UpdateAssetCommandHelper(c2AssetDirectory);
@ -229,7 +239,8 @@ public class C2NifiClientService {
transferDebugCommandHelper.debugBundleFiles(), transferDebugCommandHelper::excludeSensitiveText),
UpdateAssetOperationHandler.create(client, emptyOperandPropertiesProvider,
updateAssetCommandHelper::assetUpdatePrecondition, updateAssetCommandHelper::assetPersistFunction),
new UpdatePropertiesOperationHandler(updatePropertiesPropertyProvider, propertiesPersister::persistProperties)
new UpdatePropertiesOperationHandler(updatePropertiesPropertyProvider, propertiesPersister::persistProperties),
SyncResourceOperationHandler.create(client, emptyOperandPropertiesProvider, new DefaultSyncResourceStrategy(resourceRepository), c2Serializer)
));
}

View File

@ -63,7 +63,7 @@ public class PropertiesPersister {
this.bootstrapNewFile = new File(bootstrapFile.getParentFile() + "/" + BOOTSTRAP_UPDATED_FILE_NAME);
}
public Boolean persistProperties(Map<String, String> propertiesToUpdate) {
public Boolean persistProperties(Map<String, Object> propertiesToUpdate) {
int propertyCountToUpdate = validateProperties(propertiesToUpdate);
if (propertyCountToUpdate == 0) {
return false;
@ -99,12 +99,12 @@ public class PropertiesPersister {
return true;
}
private int validateProperties(Map<String, String> propertiesToUpdate) {
private int validateProperties(Map<String, Object> propertiesToUpdate) {
Set<UpdatableProperty> updatableProperties = (Set<UpdatableProperty>) updatePropertiesPropertyProvider.getProperties().get(AVAILABLE_PROPERTIES);
Map<String, UpdatableProperty> updatablePropertyMap = updatableProperties.stream().collect(Collectors.toMap(UpdatableProperty::getPropertyName, Function.identity()));
int propertyCountToUpdate = 0;
List<String> validationErrors = new ArrayList<>();
for (Map.Entry<String, String> entry : propertiesToUpdate.entrySet()) {
for (Map.Entry<String, Object> entry : propertiesToUpdate.entrySet()) {
UpdatableProperty updatableProperty = updatablePropertyMap.get(entry.getKey());
if (updatableProperty == null) {
validationErrors.add(String.format("You can not update the {} property through C2 protocol", entry.getKey()));
@ -112,7 +112,7 @@ public class PropertiesPersister {
}
if (!Objects.equals(updatableProperty.getPropertyValue(), entry.getValue())) {
if (!getValidator(updatableProperty.getValidator())
.map(validator -> validator.validate(entry.getKey(), entry.getValue(), validationContext))
.map(validator -> validator.validate(entry.getKey(), argToString(entry.getValue()), validationContext))
.map(ValidationResult::isValid)
.orElse(true)) {
validationErrors.add(String.format("Invalid value for %s", entry.getKey()));
@ -140,4 +140,8 @@ public class PropertiesPersister {
}
return Optional.empty();
}
private String argToString(Object argument) {
return argument instanceof String arg ? arg : null;
}
}

View File

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.c2.command.syncresource;
import static java.nio.file.Files.copy;
import static java.nio.file.Files.createTempFile;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static java.util.Map.entry;
import static java.util.Optional.empty;
import static java.util.UUID.randomUUID;
import static java.util.function.Predicate.not;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.PARTIALLY_APPLIED;
import static org.apache.nifi.c2.protocol.api.ResourceType.ASSET;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.nifi.c2.client.service.operation.SyncResourceStrategy;
import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.apache.nifi.c2.protocol.api.ResourceItem;
import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultSyncResourceStrategy implements SyncResourceStrategy {
private static final Logger LOG = LoggerFactory.getLogger(DefaultSyncResourceStrategy.class);
private static final Set<Entry<OperationState, OperationState>> SUCCESS_RESULT_PAIRS = Set.of(
entry(NO_OPERATION, NO_OPERATION),
entry(NO_OPERATION, FULLY_APPLIED),
entry(FULLY_APPLIED, NO_OPERATION),
entry(FULLY_APPLIED, FULLY_APPLIED));
private static final Set<Entry<OperationState, OperationState>> FAILED_RESULT_PAIRS = Set.of(
entry(NO_OPERATION, NOT_APPLIED),
entry(NOT_APPLIED, NO_OPERATION),
entry(NOT_APPLIED, NOT_APPLIED));
private static final String CHANGE_TO_PARENT_DIR_PATH_SEGMENT = "..";
private final ResourceRepository resourceRepository;
public DefaultSyncResourceStrategy(ResourceRepository resourceRepository) {
this.resourceRepository = resourceRepository;
}
@Override
public OperationState synchronizeResourceRepository(ResourcesGlobalHash c2GlobalHash, List<ResourceItem> c2ServerItems,
BiFunction<String, Function<InputStream, Optional<Path>>, Optional<Path>> resourceDownloadFunction,
Function<String, Optional<String>> urlEnrichFunction) {
Set<ResourceItem> c2Items = Set.copyOf(c2ServerItems);
Set<ResourceItem> agentItems = Set.copyOf(resourceRepository.findAllResourceItems());
OperationState deleteResult = deleteItems(c2Items, agentItems);
OperationState saveResult = saveNewItems(c2Items, agentItems, resourceDownloadFunction, urlEnrichFunction);
Entry<OperationState, OperationState> resultPair = entry(deleteResult, saveResult);
return SUCCESS_RESULT_PAIRS.contains(resultPair)
? saveGlobalHash(c2GlobalHash, deleteResult, saveResult)
: FAILED_RESULT_PAIRS.contains(resultPair) ? NOT_APPLIED : PARTIALLY_APPLIED;
}
private OperationState saveNewItems(Set<ResourceItem> c2Items, Set<ResourceItem> agentItems,
BiFunction<String, Function<InputStream, Optional<Path>>, Optional<Path>> resourceDownloadFunction,
Function<String, Optional<String>> urlEnrichFunction) {
List<ResourceItem> newItems = c2Items.stream().filter(not(agentItems::contains)).toList();
if (newItems.isEmpty()) {
return NO_OPERATION;
}
List<ResourceItem> addedItems = newItems.stream()
.filter(this::validate)
.map(downloadIfNotPresentAndAddToRepository(resourceDownloadFunction, urlEnrichFunction))
.flatMap(Optional::stream)
.toList();
return addedItems.isEmpty()
? NOT_APPLIED
: newItems.size() == addedItems.size() ? FULLY_APPLIED : PARTIALLY_APPLIED;
}
private Function<ResourceItem, Optional<ResourceItem>> downloadIfNotPresentAndAddToRepository(
BiFunction<String, Function<InputStream, Optional<Path>>, Optional<Path>> resourceDownloadFunction, Function<String, Optional<String>> urlEnrichFunction) {
return resourceItem -> resourceRepository.resourceItemBinaryPresent(resourceItem)
? resourceRepository.addResourceItem(resourceItem)
: urlEnrichFunction.apply(resourceItem.getUrl())
.flatMap(enrichedUrl -> resourceDownloadFunction.apply(enrichedUrl, this::persistToTemporaryLocation))
.flatMap(tempResourcePath -> resourceRepository.addResourceItem(resourceItem, tempResourcePath));
}
private boolean validate(ResourceItem resourceItem) {
if (resourceItem.getResourcePath() != null
&& resourceItem.getResourceType() == ASSET
&& resourceItem.getResourcePath().contains(CHANGE_TO_PARENT_DIR_PATH_SEGMENT)) {
LOG.error("Resource path should not contain '..' path segment in {}", resourceItem);
return false;
}
return true;
}
private Optional<Path> persistToTemporaryLocation(InputStream inputStream) {
String tempResourceId = randomUUID().toString();
try {
Path tempFile = createTempFile(tempResourceId, null);
copy(inputStream, tempFile, REPLACE_EXISTING);
return Optional.of(tempFile);
} catch (IOException e) {
LOG.error("Unable to download resource. Will retry in next heartbeat iteration", e);
return empty();
}
}
private OperationState deleteItems(Set<ResourceItem> c2Items, Set<ResourceItem> agentItems) {
List<ResourceItem> toDeleteItems = agentItems.stream().filter(not(c2Items::contains)).toList();
if (toDeleteItems.isEmpty()) {
return NO_OPERATION;
}
List<ResourceItem> deletedItems = toDeleteItems.stream()
.map(resourceRepository::deleteResourceItem)
.flatMap(Optional::stream)
.toList();
return deletedItems.isEmpty()
? NOT_APPLIED
: deletedItems.size() == toDeleteItems.size() ? FULLY_APPLIED : PARTIALLY_APPLIED;
}
private OperationState saveGlobalHash(ResourcesGlobalHash resourcesGlobalHash, OperationState deleteResult, OperationState saveResult) {
boolean isGlobalHashRefreshOnly = deleteResult == NO_OPERATION && saveResult == NO_OPERATION;
return resourceRepository.saveResourcesGlobalHash(resourcesGlobalHash)
.map(unused -> FULLY_APPLIED)
.orElse(isGlobalHashRefreshOnly ? NOT_APPLIED : PARTIALLY_APPLIED);
}
}

View File

@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.c2.command.syncresource;
import static java.nio.file.Files.copy;
import static java.nio.file.Files.createDirectories;
import static java.nio.file.Files.deleteIfExists;
import static java.nio.file.Files.exists;
import static java.nio.file.Files.newInputStream;
import static java.nio.file.Files.readString;
import static java.nio.file.Files.writeString;
import static java.nio.file.StandardCopyOption.COPY_ATTRIBUTES;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.SYNC;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
import static java.nio.file.StandardOpenOption.WRITE;
import static java.util.Optional.empty;
import static java.util.Optional.of;
import static java.util.Optional.ofNullable;
import static java.util.function.Predicate.not;
import static java.util.stream.Collectors.toList;
import static org.apache.commons.codec.digest.DigestUtils.md5Hex;
import static org.apache.commons.codec.digest.DigestUtils.sha1Hex;
import static org.apache.commons.codec.digest.DigestUtils.sha256Hex;
import static org.apache.commons.codec.digest.DigestUtils.sha512Hex;
import static org.apache.commons.codec.digest.MessageDigestAlgorithms.MD5;
import static org.apache.commons.codec.digest.MessageDigestAlgorithms.SHA_1;
import static org.apache.commons.codec.digest.MessageDigestAlgorithms.SHA_256;
import static org.apache.commons.codec.digest.MessageDigestAlgorithms.SHA_512;
import static org.apache.commons.io.file.PathUtils.createParentDirectories;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.nifi.c2.protocol.api.ResourceItem;
import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
import org.apache.nifi.c2.serializer.C2Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FileResourceRepository implements ResourceRepository {
static final String ASSET_REPOSITORY_DIRECTORY = "repository";
static final String RESOURCE_REPOSITORY_FILE_NAME = "resources.json";
private static final Logger LOG = LoggerFactory.getLogger(FileResourceRepository.class);
private static final ResourceRepositoryDescriptor EMTPY_HOLDER = new ResourceRepositoryDescriptor(new ResourcesGlobalHash(), List.of());
private final Path assetRepositoryDirectory;
private final Path extensionDirectory;
private final Path resourceRepositoryFile;
private final C2Serializer c2Serializer;
private ResourceRepositoryDescriptor resourceRepositoryDescriptor;
public FileResourceRepository(Path assetDirectory, Path extensionDirectory, Path configDirectory, C2Serializer c2Serializer) {
this.resourceRepositoryFile = configDirectory.resolve(RESOURCE_REPOSITORY_FILE_NAME);
this.c2Serializer = c2Serializer;
this.assetRepositoryDirectory = assetDirectory.resolve(ASSET_REPOSITORY_DIRECTORY);
this.extensionDirectory = extensionDirectory;
initialize();
}
@Override
public synchronized ResourcesGlobalHash findResourcesGlobalHash() {
return resourceRepositoryDescriptor.resourcesGlobalHash();
}
@Override
public synchronized Optional<ResourcesGlobalHash> saveResourcesGlobalHash(ResourcesGlobalHash resourcesGlobalHash) {
ResourceRepositoryDescriptor newRepositoryDescriptor = new ResourceRepositoryDescriptor(resourcesGlobalHash, resourceRepositoryDescriptor.resourceItems());
try {
persist(newRepositoryDescriptor);
return Optional.of(resourcesGlobalHash);
} catch (IOException e) {
LOG.error("Unable to save global resource hash data", e);
return empty();
}
}
@Override
public synchronized List<ResourceItem> findAllResourceItems() {
return resourceRepositoryDescriptor.resourceItems();
}
@Override
public synchronized boolean resourceItemBinaryPresent(ResourceItem resourceItem) {
Path path = resourcePath(resourceItem);
return exists(path) && calculateDigest(path, resourceItem.getHashType()).equals(resourceItem.getDigest());
}
@Override
public synchronized Optional<ResourceItem> addResourceItem(ResourceItem resourceItem) {
try {
List<ResourceItem> newItems = new ArrayList<>(resourceRepositoryDescriptor.resourceItems());
newItems.add(resourceItem);
ResourceRepositoryDescriptor newRepositoryDescriptor = new ResourceRepositoryDescriptor(resourceRepositoryDescriptor.resourcesGlobalHash(), newItems);
persist(newRepositoryDescriptor);
return of(resourceItem);
} catch (IOException e) {
LOG.error("Unable to persist repository metadata", e);
return empty();
}
}
@Override
public synchronized Optional<ResourceItem> addResourceItem(ResourceItem resourceItem, Path source) {
Path resourcePath = resourcePath(resourceItem);
try {
createParentDirectories(resourcePath);
copy(source, resourcePath, REPLACE_EXISTING, COPY_ATTRIBUTES);
} catch (IOException e) {
LOG.error("Unable to move resource to final location. Syncing this asset will be retried in next heartbeat iteration", e);
return empty();
} finally {
deleteSilently(source, "Unable to clear temporary file");
}
Optional<ResourceItem> addedItem = addResourceItem(resourceItem);
if (addedItem.isEmpty()) {
deleteSilently(resourcePath, "Unable to cleanup resource file");
}
return addedItem;
}
@Override
public synchronized Optional<ResourceItem> deleteResourceItem(ResourceItem resourceItem) {
List<ResourceItem> truncatedItems = resourceRepositoryDescriptor.resourceItems()
.stream()
.filter(syncAssetItem -> !syncAssetItem.getResourceId().equals(resourceItem.getResourceId()))
.collect(toList());
ResourceRepositoryDescriptor newRepositoryDescriptor = new ResourceRepositoryDescriptor(resourceRepositoryDescriptor.resourcesGlobalHash(), truncatedItems);
try {
persist(newRepositoryDescriptor);
} catch (Exception e) {
LOG.error("Unable to persist repository metadata", e);
return empty();
}
Path resourcePath = resourcePath(resourceItem);
deleteSilently(resourcePath, "Unable to delete resource file");
return Optional.of(resourceItem);
}
private void initialize() {
try {
createDirectories(assetRepositoryDirectory);
createDirectories(extensionDirectory);
if (!exists(resourceRepositoryFile)) {
persist(EMTPY_HOLDER);
} else {
load();
}
} catch (IOException e) {
throw new RuntimeException("Unable to initialize resource repository", e);
}
}
private void persist(ResourceRepositoryDescriptor descriptor) throws IOException {
String serializedDescriptor =
c2Serializer.serialize(descriptor).orElseThrow(() -> new IllegalStateException("Unable to serialize repository descriptor object"));
writeString(resourceRepositoryFile, serializedDescriptor, CREATE, TRUNCATE_EXISTING, WRITE, SYNC);
resourceRepositoryDescriptor = descriptor;
}
private void load() throws IOException {
String rawResourceRepository = readString(resourceRepositoryFile);
resourceRepositoryDescriptor = c2Serializer.deserialize(rawResourceRepository, ResourceRepositoryDescriptor.class)
.orElseThrow(() -> new IllegalStateException("Unable to deserialize repository descriptor object"));
}
private Path resourcePath(ResourceItem resourceItem) {
return switch (resourceItem.getResourceType()) {
case ASSET -> ofNullable(resourceItem.getResourcePath())
.filter(not(String::isBlank))
.map(assetRepositoryDirectory::resolve)
.orElse(assetRepositoryDirectory)
.resolve(resourceItem.getResourceName());
case EXTENSION -> extensionDirectory.resolve(resourceItem.getResourceName());
};
}
private void deleteSilently(Path path, String errorMessage) {
try {
deleteIfExists(path);
} catch (IOException e) {
LOG.error(errorMessage, e);
}
}
private String calculateDigest(Path path, String digestAlgorithm) {
try (InputStream inputStream = newInputStream(path)) {
return switch (digestAlgorithm) {
case MD5 -> md5Hex(inputStream);
case SHA_1 -> sha1Hex(inputStream);
case SHA_256 -> sha256Hex(inputStream);
case SHA_512 -> sha512Hex(inputStream);
default -> throw new Exception("Unsupported digest algorithm: " + digestAlgorithm);
};
} catch (Exception e) {
throw new RuntimeException("Unable to calculate digest for resource", e);
}
}
record ResourceRepositoryDescriptor(ResourcesGlobalHash resourcesGlobalHash, List<ResourceItem> resourceItems) {
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.c2.command.syncresource;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
import org.apache.nifi.c2.protocol.api.ResourceItem;
public interface ResourceRepository {
ResourcesGlobalHash findResourcesGlobalHash();
Optional<ResourcesGlobalHash> saveResourcesGlobalHash(ResourcesGlobalHash resourcesGlobalHash);
List<ResourceItem> findAllResourceItems();
boolean resourceItemBinaryPresent(ResourceItem resourceItem);
Optional<ResourceItem> addResourceItem(ResourceItem resourceItem);
Optional<ResourceItem> addResourceItem(ResourceItem resourceItem, Path source);
Optional<ResourceItem> deleteResourceItem(ResourceItem resourceItem);
}

View File

@ -0,0 +1,312 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.c2.command.syncresource;
import static java.lang.Boolean.TRUE;
import static java.nio.file.Files.createTempFile;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.PARTIALLY_APPLIED;
import static org.apache.nifi.c2.protocol.api.ResourceType.ASSET;
import static org.apache.nifi.c2.protocol.api.ResourceType.EXTENSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.apache.nifi.c2.protocol.api.ResourceItem;
import org.apache.nifi.c2.protocol.api.ResourceType;
import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class DefaultSyncResourceStrategyTest {
private static final ResourcesGlobalHash C2_GLOBAL_HASH = resourcesGlobalHash("digest1");
private static final String FAIL_DOWNLOAD_URL = "fail";
private static final BiFunction<String, Function<InputStream, Optional<Path>>, Optional<Path>> URL_TO_CONTENT_DOWNLOAD_FUNCTION =
(url, persistFunction) -> url.endsWith(FAIL_DOWNLOAD_URL) ? empty() : persistFunction.apply(new ByteArrayInputStream(url.getBytes()));
private static String ENRICH_PREFIX = "pre_";
private static final Function<String, Optional<String>> PREFIXING_ENRICH_FUNCTION = url -> ofNullable(url).map(arg -> ENRICH_PREFIX + arg);
@Mock
private ResourceRepository mockResourceRepository;
private DefaultSyncResourceStrategy testSyncResourceStrategy;
@BeforeEach
public void setup() {
testSyncResourceStrategy = new DefaultSyncResourceStrategy(mockResourceRepository);
}
@Test
public void testAddingNewItems() {
List<ResourceItem> c2Items = List.of(
resourceItem("resource1", "url1", null, ASSET),
resourceItem("resource2", "url2", "", ASSET),
resourceItem("resource3", "url3", "path3", ASSET),
resourceItem("resource4", "url4", null, EXTENSION),
resourceItem("resource5", "url5", "path5", EXTENSION)
);
when(mockResourceRepository.findAllResourceItems()).thenReturn(List.of());
when(mockResourceRepository.saveResourcesGlobalHash(C2_GLOBAL_HASH)).thenReturn(Optional.of(C2_GLOBAL_HASH));
c2Items.forEach(resourceItem -> {
try {
when(mockResourceRepository.addResourceItem(eq(resourceItem), any())).thenReturn(Optional.of(resourceItem));
} catch (Exception e) {
}
});
OperationState resultState =
testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, c2Items, URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION);
assertEquals(FULLY_APPLIED, resultState);
try {
verify(mockResourceRepository, never()).deleteResourceItem(any());
} catch (Exception e) {
}
}
@Test
public void testAddingNewItemWhenBinaryPresent() {
ResourceItem resourceItem = resourceItem("resource1", "url1", null, ASSET);
when(mockResourceRepository.findAllResourceItems()).thenReturn(List.of());
when(mockResourceRepository.saveResourcesGlobalHash(C2_GLOBAL_HASH)).thenReturn(Optional.of(C2_GLOBAL_HASH));
when(mockResourceRepository.addResourceItem(resourceItem)).thenReturn(Optional.of(resourceItem));
when(mockResourceRepository.resourceItemBinaryPresent(resourceItem)).thenReturn(TRUE);
OperationState resultState =
testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, List.of(resourceItem), URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION);
assertEquals(FULLY_APPLIED, resultState);
try {
verify(mockResourceRepository, never()).deleteResourceItem(any());
} catch (Exception e) {
}
}
@Test
public void testAddingNewItemFailureWhenTypeIsAssetAndPathContainsDoubleDots() {
List<ResourceItem> c2Items = List.of(
resourceItem("resource1", "valid_url", "../path", ASSET)
);
when(mockResourceRepository.findAllResourceItems()).thenReturn(List.of());
OperationState resultState =
testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, c2Items, URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION);
assertEquals(NOT_APPLIED, resultState);
try {
verify(mockResourceRepository, never()).deleteResourceItem(any());
verify(mockResourceRepository, never()).addResourceItem(any());
verify(mockResourceRepository, never()).addResourceItem(any(), any());
verify(mockResourceRepository, never()).saveResourcesGlobalHash(C2_GLOBAL_HASH);
} catch (Exception e) {
}
}
@Test
public void testAddingNewItemFailureDueToIssueWithUrlEnrichment() {
List<ResourceItem> c2Items = List.of(
resourceItem("resource1", null, null, ASSET)
);
when(mockResourceRepository.findAllResourceItems()).thenReturn(List.of());
OperationState resultState =
testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, c2Items, URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION);
assertEquals(NOT_APPLIED, resultState);
try {
verify(mockResourceRepository, never()).deleteResourceItem(any());
verify(mockResourceRepository, never()).saveResourcesGlobalHash(C2_GLOBAL_HASH);
} catch (Exception e) {
}
}
@Test
public void testAddingNewItemFailureDueToIssueInDownloadFunction() {
List<ResourceItem> c2Items = List.of(
resourceItem("resource1", FAIL_DOWNLOAD_URL, null, ASSET)
);
when(mockResourceRepository.findAllResourceItems()).thenReturn(List.of());
OperationState resultState =
testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, c2Items, URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION);
assertEquals(NOT_APPLIED, resultState);
try {
verify(mockResourceRepository, never()).deleteResourceItem(any());
verify(mockResourceRepository, never()).saveResourcesGlobalHash(C2_GLOBAL_HASH);
} catch (Exception e) {
}
}
@Test
public void testAddingNewItemFailureDueToIssueInPersistFunction() {
List<ResourceItem> c2Items = List.of(
resourceItem("resource1", "url1", null, ASSET)
);
when(mockResourceRepository.findAllResourceItems()).thenReturn(List.of());
try (MockedStatic<Files> mockedFiles = mockStatic(Files.class)) {
mockedFiles.when(() -> createTempFile(anyString(), eq(null))).thenThrow(IOException.class);
OperationState resultState =
testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, c2Items, URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION);
assertEquals(NOT_APPLIED, resultState);
try {
verify(mockResourceRepository, never()).deleteResourceItem(any());
verify(mockResourceRepository, never()).saveResourcesGlobalHash(C2_GLOBAL_HASH);
} catch (Exception e) {
}
}
}
@Test
public void testAddingNewItemFailureDueToIssueWhenUpdatingRepository() {
ResourceItem resourceItem = resourceItem("resource1", "url1", null, ASSET);
List<ResourceItem> c2Items = List.of(resourceItem);
when(mockResourceRepository.findAllResourceItems()).thenReturn(List.of());
try {
when(mockResourceRepository.addResourceItem(resourceItem)).thenThrow(Exception.class);
} catch (Exception e) {
}
OperationState resultState =
testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, c2Items, URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION);
assertEquals(NOT_APPLIED, resultState);
try {
verify(mockResourceRepository, never()).deleteResourceItem(any());
verify(mockResourceRepository, never()).saveResourcesGlobalHash(C2_GLOBAL_HASH);
} catch (Exception e) {
}
}
@Test
public void testDeletingAllItems() {
List<ResourceItem> c2Items = List.of();
List<ResourceItem> agentItems = List.of(
resourceItem("resource1", "url1", null, ASSET),
resourceItem("resource2", "url2", null, ASSET),
resourceItem("resource3", "url3", null, EXTENSION)
);
when(mockResourceRepository.findAllResourceItems()).thenReturn(agentItems);
when(mockResourceRepository.saveResourcesGlobalHash(C2_GLOBAL_HASH)).thenReturn(Optional.of(C2_GLOBAL_HASH));
agentItems.forEach(agentItem -> {
try {
when(mockResourceRepository.deleteResourceItem(agentItem)).thenReturn(Optional.of(agentItem));
} catch (Exception e) {
}
});
OperationState resultState =
testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, c2Items, URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION);
assertEquals(FULLY_APPLIED, resultState);
try {
verify(mockResourceRepository, never()).addResourceItem(any());
} catch (Exception e) {
}
}
@Test
public void testDeleteFailureDueToIssueWithUpdatingRepository() {
List<ResourceItem> c2Items = List.of();
List<ResourceItem> agentItems = List.of(
resourceItem("resource1", "url1", null, ASSET)
);
when(mockResourceRepository.findAllResourceItems()).thenReturn(agentItems);
agentItems.forEach(agentItem -> {
try {
when(mockResourceRepository.deleteResourceItem(agentItem)).thenThrow(Exception.class);
} catch (Exception e) {
}
});
OperationState resultState =
testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, c2Items, URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION);
assertEquals(NOT_APPLIED, resultState);
try {
verify(mockResourceRepository, never()).addResourceItem(any());
verify(mockResourceRepository, never()).saveResourcesGlobalHash(C2_GLOBAL_HASH);
} catch (Exception e) {
}
}
@Test
public void testAddFileSuccessfulButUpdateGlobalHashFails() {
ResourceItem c2Item = resourceItem("resource1", "url1", null, ASSET);
when(mockResourceRepository.findAllResourceItems()).thenReturn(List.of());
try {
when(mockResourceRepository.addResourceItem(eq(c2Item), any())).thenReturn(Optional.of(c2Item));
when(mockResourceRepository.saveResourcesGlobalHash(C2_GLOBAL_HASH)).thenThrow(Exception.class);
} catch (Exception e) {
}
OperationState resultState =
testSyncResourceStrategy.synchronizeResourceRepository(C2_GLOBAL_HASH, List.of(c2Item), URL_TO_CONTENT_DOWNLOAD_FUNCTION, PREFIXING_ENRICH_FUNCTION);
assertEquals(PARTIALLY_APPLIED, resultState);
try {
verify(mockResourceRepository, never()).deleteResourceItem(any());
} catch (Exception e) {
}
}
private static ResourcesGlobalHash resourcesGlobalHash(String digest) {
ResourcesGlobalHash resourcesGlobalHash = new ResourcesGlobalHash();
resourcesGlobalHash.setDigest(digest);
return resourcesGlobalHash;
}
private ResourceItem resourceItem(String name, String url, String path, ResourceType resourceType) {
ResourceItem resourceItem = new ResourceItem();
resourceItem.setResourceName(name);
resourceItem.setUrl(url);
resourceItem.setResourceType(resourceType);
resourceItem.setResourcePath(path);
return resourceItem;
}
}

View File

@ -0,0 +1,450 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.c2.command.syncresource;
import static java.nio.file.Files.createDirectories;
import static java.nio.file.Files.exists;
import static java.nio.file.Files.readString;
import static java.nio.file.Files.writeString;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.SYNC;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
import static java.nio.file.StandardOpenOption.WRITE;
import static java.util.UUID.randomUUID;
import static org.apache.commons.codec.digest.DigestUtils.sha512Hex;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.nifi.c2.protocol.api.ResourceType.ASSET;
import static org.apache.nifi.c2.protocol.api.ResourceType.EXTENSION;
import static org.apache.nifi.minifi.c2.command.syncresource.FileResourceRepository.ASSET_REPOSITORY_DIRECTORY;
import static org.apache.nifi.minifi.c2.command.syncresource.FileResourceRepository.RESOURCE_REPOSITORY_FILE_NAME;
import static org.apache.nifi.util.StringUtils.EMPTY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mockStatic;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import org.apache.nifi.c2.protocol.api.ResourceItem;
import org.apache.nifi.c2.protocol.api.ResourceType;
import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
import org.apache.nifi.c2.serializer.C2JacksonSerializer;
import org.apache.nifi.c2.serializer.C2Serializer;
import org.apache.nifi.minifi.c2.command.syncresource.FileResourceRepository.ResourceRepositoryDescriptor;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class FileResourceRepositoryTest {
private static final String RESOURCE_BINARY_CONTENT = "content";
@TempDir
private Path testBaseDirectory;
private C2Serializer c2Serializer;
private Path assetDirectory;
private Path assetRepositoryDirectory;
private Path extensionDirectory;
private Path repositoryFile;
private Path configDirectoryPath;
@BeforeEach
public void setup() throws IOException {
c2Serializer = new C2JacksonSerializer();
configDirectoryPath = testBaseDirectory.resolve("conf");
createDirectories(configDirectoryPath);
assetDirectory = testBaseDirectory.resolve("assets");
assetRepositoryDirectory = assetDirectory.resolve(ASSET_REPOSITORY_DIRECTORY);
createDirectories(assetRepositoryDirectory);
extensionDirectory = testBaseDirectory.resolve("extensions");
createDirectories(extensionDirectory);
repositoryFile = configDirectoryPath.resolve(RESOURCE_REPOSITORY_FILE_NAME);
}
@Test
public void testRepositoryInitializesWithEmptyContent() throws IOException {
FileResourceRepository testRepository = createTestRepository();
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertNull(testRepository.findResourcesGlobalHash().getDigest());
assertNull(testRepository.findResourcesGlobalHash().getHashType());
assertEquals(List.of(), testRepository.findAllResourceItems());
}
@Test
public void testRepositoryInitializesWithExistingContent() throws IOException {
ResourcesGlobalHash resourcesGlobalHash = resourcesGlobalHash("digest", "hashType");
ResourceItem resourceItem = resourceItem("resourceId", null, ASSET);
ResourceRepositoryDescriptor initialRepositoryDescriptor = new ResourceRepositoryDescriptor(resourcesGlobalHash, List.of(resourceItem));
saveRepository(initialRepositoryDescriptor);
FileResourceRepository testRepository = createTestRepository();
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertEquals("digest", testRepository.findResourcesGlobalHash().getDigest());
assertEquals("hashType", testRepository.findResourcesGlobalHash().getHashType());
assertEquals(1, testRepository.findAllResourceItems().size());
assertEquals("resourceId", testRepository.findAllResourceItems().getFirst().getResourceId());
}
@Test
public void testRepositoryInitializationFailure() {
try (MockedStatic<Files> mockedFiles = mockStatic(Files.class)) {
mockedFiles.when(() -> createDirectories(any())).thenThrow(new IOException());
assertThrowsExactly(RuntimeException.class, this::createTestRepository);
}
}
@Test
public void testSaveGlobalHashSuccess() throws IOException {
ResourcesGlobalHash originalGlobalHash = resourcesGlobalHash("digest1", "hashType1");
ResourceRepositoryDescriptor initialRepositoryDescriptor = new ResourceRepositoryDescriptor(originalGlobalHash, List.of());
saveRepository(initialRepositoryDescriptor);
FileResourceRepository testRepository = createTestRepository();
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertEquals("digest1", testRepository.findResourcesGlobalHash().getDigest());
assertEquals("hashType1", testRepository.findResourcesGlobalHash().getHashType());
assertTrue(testRepository.findAllResourceItems().isEmpty());
ResourcesGlobalHash updatedGlobalHash = resourcesGlobalHash("digest2", "hashType2");
testRepository.saveResourcesGlobalHash(updatedGlobalHash);
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertEquals("digest2", testRepository.findResourcesGlobalHash().getDigest());
assertEquals("hashType2", testRepository.findResourcesGlobalHash().getHashType());
assertTrue(testRepository.findAllResourceItems().isEmpty());
}
@Test
public void testSaveGlobalHashSuccessFailure() throws IOException {
ResourcesGlobalHash originalGlobalHash = resourcesGlobalHash("digest1", "hashType1");
ResourceRepositoryDescriptor initialRepositoryDescriptor = new ResourceRepositoryDescriptor(originalGlobalHash, List.of());
saveRepository(initialRepositoryDescriptor);
FileResourceRepository testRepository = createTestRepository();
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertEquals("digest1", testRepository.findResourcesGlobalHash().getDigest());
assertEquals("hashType1", testRepository.findResourcesGlobalHash().getHashType());
assertTrue(testRepository.findAllResourceItems().isEmpty());
Optional<ResourcesGlobalHash> result;
try (MockedStatic<Files> mockedFiles = mockStatic(Files.class)) {
mockedFiles.when(() -> writeString(any(), any(), eq(CREATE), eq(TRUNCATE_EXISTING), eq(WRITE), eq(SYNC))).thenThrow(new IOException());
ResourcesGlobalHash updatedGlobalHash = resourcesGlobalHash("digest2", "hashType2");
result = testRepository.saveResourcesGlobalHash(updatedGlobalHash);
}
assertTrue(result.isEmpty());
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertEquals("digest1", testRepository.findResourcesGlobalHash().getDigest());
assertEquals("hashType1", testRepository.findResourcesGlobalHash().getHashType());
assertTrue(testRepository.findAllResourceItems().isEmpty());
}
@Test
public void testResourceItemBinaryPresent() throws IOException {
String content = "content";
String digest = sha512Hex(content);
ResourceItem resourceItem = resourceItem("resource1", null, ASSET, "SHA-512", digest);
createResourceBinary(resourceItem, content);
FileResourceRepository testRepository = createTestRepository();
boolean result = testRepository.resourceItemBinaryPresent(resourceItem);
assertTrue(result);
}
@Test
public void testResourceItemBinaryPresentHashMismatch() throws IOException {
String content = "content";
ResourceItem resourceItem = resourceItem("resource1", null, ASSET, "SHA-512", "not_matching_hash");
createResourceBinary(resourceItem, content);
FileResourceRepository testRepository = createTestRepository();
boolean result = testRepository.resourceItemBinaryPresent(resourceItem);
assertFalse(result);
}
@Test
public void testResourceItemBinaryPresentUnsupportedHashAlgorithm() throws IOException {
String content = "content";
ResourceItem resourceItem = resourceItem("resource1", null, ASSET, "unsupported_algorithm", "some_hash");
createResourceBinary(resourceItem, content);
FileResourceRepository testRepository = createTestRepository();
assertThrowsExactly(RuntimeException.class, () -> testRepository.resourceItemBinaryPresent(resourceItem));
}
@Test
public void testAddResourceItemsWithoutContent() throws IOException {
FileResourceRepository testRepository = createTestRepository();
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertTrue(testRepository.findAllResourceItems().isEmpty());
ResourceItem firstNewItem = resourceItem("resource1", null, ASSET);
Optional<ResourceItem> firstResult = testRepository.addResourceItem(firstNewItem);
assertTrue(firstResult.isPresent());
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertEquals(1, testRepository.findAllResourceItems().size());
assertEquals("resource1", testRepository.findAllResourceItems().getFirst().getResourceId());
ResourceItem secondNewItem = resourceItem("resource2", null, ASSET);
Optional<ResourceItem> secondResult = testRepository.addResourceItem(secondNewItem);
assertTrue(secondResult.isPresent());
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertEquals(2, testRepository.findAllResourceItems().size());
assertIterableEquals(List.of("resource1", "resource2"), testRepository.findAllResourceItems().stream().map(ResourceItem::getResourceId).toList());
}
@Test
public void testAddResourceItemsWithoutContentErrorCase() throws IOException {
FileResourceRepository testRepository = createTestRepository();
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertTrue(testRepository.findAllResourceItems().isEmpty());
ResourceItem firstNewItem = resourceItem("resource1", null, ASSET);
Optional<ResourceItem> result;
try (MockedStatic<Files> mockedFiles = mockStatic(Files.class)) {
mockedFiles.when(() -> writeString(any(), any(), eq(CREATE), eq(TRUNCATE_EXISTING), eq(WRITE), eq(SYNC))).thenThrow(new IOException());
result = testRepository.addResourceItem(firstNewItem);
}
assertTrue(result.isEmpty());
}
@Test
public void testAddResourceItemsWithContent() throws IOException {
FileResourceRepository testRepository = createTestRepository();
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertTrue(testRepository.findAllResourceItems().isEmpty());
ResourceItem firstNewItem = resourceItem("resource1", null, ASSET);
Path firstItemTempPath = createTempBinary();
Path firstItemExpectedPath = resourcePath(firstNewItem);
Optional<ResourceItem> firstResult = testRepository.addResourceItem(firstNewItem, firstItemTempPath);
assertTrue(firstResult.isPresent());
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertEquals(1, testRepository.findAllResourceItems().size());
assertEquals("resource1", testRepository.findAllResourceItems().getFirst().getResourceId());
assertFalse(exists(firstItemTempPath));
assertTrue(exists(firstItemExpectedPath));
ResourceItem secondNewItem = resourceItem("resource2", "subdirectory", ASSET);
Path secondItemTempPath = createTempBinary();
Path secondItemExpectedPath = resourcePath(secondNewItem);
Optional<ResourceItem> secondResult = testRepository.addResourceItem(secondNewItem, secondItemTempPath);
assertTrue(secondResult.isPresent());
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertEquals(2, testRepository.findAllResourceItems().size());
assertIterableEquals(List.of("resource1", "resource2"), testRepository.findAllResourceItems().stream().map(ResourceItem::getResourceId).toList());
assertFalse(exists(secondItemTempPath));
assertTrue(exists(firstItemExpectedPath));
assertTrue(exists(secondItemExpectedPath));
ResourceItem thirdNewItem = resourceItem("resource3", null, EXTENSION);
Path thirdItemTempPath = createTempBinary();
Path thirdItemExpectedPath = resourcePath(thirdNewItem);
Optional<ResourceItem> thirdResult = testRepository.addResourceItem(thirdNewItem, thirdItemTempPath);
assertTrue(thirdResult.isPresent());
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertEquals(3, testRepository.findAllResourceItems().size());
assertIterableEquals(List.of("resource1", "resource2", "resource3"), testRepository.findAllResourceItems().stream().map(ResourceItem::getResourceId).toList());
assertFalse(exists(thirdItemTempPath));
assertTrue(exists(firstItemExpectedPath));
assertTrue(exists(secondItemExpectedPath));
assertTrue(exists(thirdItemExpectedPath));
}
@Test
public void testAddResourceItemsWithContentErrorWhenCopying() throws IOException {
FileResourceRepository testRepository = createTestRepository();
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertTrue(testRepository.findAllResourceItems().isEmpty());
ResourceItem resourceItem = resourceItem("resource1", null, ASSET);
Path resourceItemTempPath = Path.of("non_existing_path");
Path resourceItemExpectedPath = resourcePath(resourceItem);
Optional<ResourceItem> result = testRepository.addResourceItem(resourceItem, resourceItemTempPath);
assertTrue(result.isEmpty());
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertEquals(0, testRepository.findAllResourceItems().size());
assertFalse(exists(resourceItemTempPath));
assertFalse(exists(resourceItemExpectedPath));
}
@Test
public void testDeleteResourceItem() throws IOException {
ResourceItem firstItem = resourceItem("resource1", null, ASSET);
Path firstItemPath = createResourceBinary(firstItem, RESOURCE_BINARY_CONTENT);
ResourceItem secondItem = resourceItem("resource2", null, ASSET);
Path secondItemPath = createResourceBinary(secondItem, RESOURCE_BINARY_CONTENT);
ResourceItem thirdItem = resourceItem("resource3", null, ASSET);
Path thirdItemPath = createResourceBinary(thirdItem, RESOURCE_BINARY_CONTENT);
ResourceRepositoryDescriptor initialRepositoryDescriptor = new ResourceRepositoryDescriptor(new ResourcesGlobalHash(), List.of(firstItem, secondItem, thirdItem));
saveRepository(initialRepositoryDescriptor);
FileResourceRepository testRepository = createTestRepository();
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertEquals(3, testRepository.findAllResourceItems().size());
assertIterableEquals(List.of("resource1", "resource2", "resource3"), testRepository.findAllResourceItems().stream().map(ResourceItem::getResourceId).toList());
assertTrue(exists(firstItemPath));
assertTrue(exists(secondItemPath));
assertTrue(exists(thirdItemPath));
Optional<ResourceItem> firstResult = testRepository.deleteResourceItem(firstItem);
assertTrue(firstResult.isPresent());
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertEquals(2, testRepository.findAllResourceItems().size());
assertIterableEquals(List.of("resource2", "resource3"), testRepository.findAllResourceItems().stream().map(ResourceItem::getResourceId).toList());
assertFalse(exists(firstItemPath));
assertTrue(exists(secondItemPath));
assertTrue(exists(thirdItemPath));
Optional<ResourceItem> secondResult = testRepository.deleteResourceItem(thirdItem);
assertTrue(secondResult.isPresent());
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertEquals(1, testRepository.findAllResourceItems().size());
assertIterableEquals(List.of("resource2"), testRepository.findAllResourceItems().stream().map(ResourceItem::getResourceId).toList());
assertFalse(exists(firstItemPath));
assertTrue(exists(secondItemPath));
assertFalse(exists(thirdItemPath));
Optional<ResourceItem> thirdResult = testRepository.deleteResourceItem(secondItem);
assertTrue(thirdResult.isPresent());
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertEquals(0, testRepository.findAllResourceItems().size());
assertIterableEquals(List.of(), testRepository.findAllResourceItems().stream().map(ResourceItem::getResourceId).toList());
assertFalse(exists(firstItemPath));
assertFalse(exists(secondItemPath));
assertFalse(exists(thirdItemPath));
}
@Test
public void testDeleteResourceItemErrorCase() throws IOException {
FileResourceRepository testRepository = createTestRepository();
assertRepositoryInMemoryContentEqualsPersistedContent(testRepository);
assertTrue(testRepository.findAllResourceItems().isEmpty());
ResourceItem toDeleteItem = resourceItem("resource1", null, ASSET);
Optional<ResourceItem> result;
try (MockedStatic<Files> mockedFiles = mockStatic(Files.class)) {
mockedFiles.when(() -> writeString(any(), any(), eq(CREATE), eq(TRUNCATE_EXISTING), eq(WRITE), eq(SYNC))).thenThrow(new IOException());
result = testRepository.deleteResourceItem(toDeleteItem);
}
assertTrue(result.isEmpty());
}
private ResourceRepositoryDescriptor loadRepository() throws IOException {
return c2Serializer.deserialize(readString(repositoryFile), ResourceRepositoryDescriptor.class).orElse(null);
}
private void saveRepository(ResourceRepositoryDescriptor resourceRepositoryDescriptor) throws IOException {
writeString(repositoryFile, c2Serializer.serialize(resourceRepositoryDescriptor).orElse(EMPTY), CREATE, TRUNCATE_EXISTING, WRITE, SYNC);
}
private FileResourceRepository createTestRepository() {
return new FileResourceRepository(assetDirectory, extensionDirectory, configDirectoryPath, c2Serializer);
}
private ResourcesGlobalHash resourcesGlobalHash(String digest, String hashType) {
ResourcesGlobalHash resourcesGlobalHash = new ResourcesGlobalHash();
resourcesGlobalHash.setDigest(digest);
resourcesGlobalHash.setHashType(hashType);
return resourcesGlobalHash;
}
private ResourceItem resourceItem(String id, String path, ResourceType resourceType) {
return resourceItem(id, path, resourceType, null, null);
}
private ResourceItem resourceItem(String id, String path, ResourceType resourceType, String hashType, String digest) {
ResourceItem resourceItem = new ResourceItem();
resourceItem.setResourceId(id);
resourceItem.setResourceName(id);
resourceItem.setResourcePath(path);
resourceItem.setResourceType(resourceType);
resourceItem.setHashType(hashType);
resourceItem.setDigest(digest);
return resourceItem;
}
private void assertRepositoryInMemoryContentEqualsPersistedContent(FileResourceRepository testRepository) throws IOException {
assertTrue(exists(repositoryFile));
ResourceRepositoryDescriptor loadedRepositoryDescriptor = loadRepository();
assertNotNull(loadedRepositoryDescriptor);
assertEquals(loadedRepositoryDescriptor.resourcesGlobalHash(), testRepository.findResourcesGlobalHash());
assertIterableEquals(loadedRepositoryDescriptor.resourceItems(), testRepository.findAllResourceItems());
}
private Path resourcePath(ResourceItem item) {
Path resourcePath = switch (item.getResourceType()) {
case ASSET -> assetRepositoryDirectory.resolve(isBlank(item.getResourcePath()) ? item.getResourceName() : item.getResourcePath() + "/" + item.getResourceName());
case EXTENSION -> extensionDirectory.resolve(item.getResourceName());
};
return resourcePath.toAbsolutePath();
}
private Path createResourceBinary(ResourceItem resourceItem, String content) throws IOException {
Path resourcePath = resourcePath(resourceItem);
createFile(resourcePath, content);
return resourcePath;
}
private Path createTempBinary() throws IOException {
Path resourcePath = testBaseDirectory.resolve(randomUUID().toString());
createFile(resourcePath, RESOURCE_BINARY_CONTENT);
return resourcePath;
}
private void createFile(Path path, String content) throws IOException {
createDirectories(path.getParent());
writeString(path, content);
}
}

View File

@ -36,9 +36,6 @@ java.arg.1=-Dorg.apache.jasper.compiler.disablejsr199=true
java.arg.2=-Xms${minifi.jvm.heap.mb}m
java.arg.3=-Xmx${minifi.jvm.heap.mb}m
# Enable Remote Debugging
#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000
# allowRestrictedHeaders is required for Cluster/Node communications to work properly
java.arg.5=-Dsun.net.http.allowRestrictedHeaders=true
java.arg.6=-Djava.protocol.handler.pkgs=sun.net.www.protocol
@ -91,13 +88,6 @@ nifi.minifi.sensitive.props.algorithm=
#c2.agent.identifier=
# If set to false heartbeat won't contain the manifest. Defaults to true.
#c2.full.heartbeat=false
## define protocol parameters
# DEPRECATED: c2.rest.url and c2.rest.url.ack are deprecated in favor of c2.rest.path.* properties and are target to be removed in future release
# The absolute url of the C2 server's heartbeat endpoint, eg.: http://localhost/c2-server/api/heartbeat
#c2.rest.url=
# The absolute url of the C2 server's acknowledge endpoint, eg.: http://localhost/c2-server/api/acknowledge
#c2.rest.url.ack=
# C2 Rest Path Properties
# The base path of the C2 server's REST API, eg.: http://localhost/c2-server/api
#c2.rest.path.base=
# Relative url of the C2 server's heartbeat endpoint, eg.: /heartbeat