NIFI-13614: populate failure cause in operation state

This closes #9136.

Signed-off-by: Ferenc Kis <briansolo1985@gmail.com>
This commit is contained in:
Kalman Jantner 2024-08-21 14:59:30 +02:00 committed by Ferenc Kis
parent d1432d6be9
commit 5c3e22aa44
No known key found for this signature in database
GPG Key ID: 5E1CCAC15A5958F2
25 changed files with 321 additions and 108 deletions

View File

@ -85,7 +85,7 @@ public interface C2Client {
*
* @param absoluteUrl absolute url sent by C2 server
* @param relativeUrl relative url sent by C2 server
* @return an optional with content of finalised callback url
* @return finalised callback url
*/
Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl);
String getCallbackUrl(String absoluteUrl, String relativeUrl);
}

View File

@ -146,7 +146,7 @@ public class C2HttpClient implements C2Client {
}
@Override
public Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl) {
public String getCallbackUrl(String absoluteUrl, String relativeUrl) {
return c2UrlProvider.getCallbackUrl(absoluteUrl, relativeUrl);
}

View File

@ -17,8 +17,6 @@
package org.apache.nifi.c2.client.http.url;
import java.util.Optional;
public interface C2UrlProvider {
/**
@ -42,5 +40,5 @@ public interface C2UrlProvider {
* @param relativeUrl relative url sent by the C2 server
* @return the url of the C2 server to send requests to
*/
Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl);
String getCallbackUrl(String absoluteUrl, String relativeUrl);
}

View File

@ -45,11 +45,12 @@ public class LegacyC2UrlProvider implements C2UrlProvider {
}
@Override
public Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl) {
Optional<String> url = Optional.ofNullable(absoluteUrl).filter(StringUtils::isNotBlank);
if (!url.isPresent()) {
LOG.error("Provided absolute url was empty or null. Relative urls are not supported with this configuration");
}
return url;
public String getCallbackUrl(String absoluteUrl, String relativeUrl) {
return Optional.ofNullable(absoluteUrl)
.filter(StringUtils::isNotBlank)
.orElseThrow( () -> {
LOG.error("Provided absolute url was empty or null. Relative urls are not supported with this configuration");
throw new IllegalArgumentException("Provided absolute url was empty or null. Relative C2 urls are not supported with this configuration");
});
}
}

View File

@ -59,11 +59,12 @@ public class ProxyAwareC2UrlProvider implements C2UrlProvider {
}
@Override
public Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl) {
public String getCallbackUrl(String absoluteUrl, String relativeUrl) {
return Optional.ofNullable(relativeUrl)
.map(this::toAbsoluteUrl)
.filter(Optional::isPresent)
.orElseGet(() -> Optional.ofNullable(absoluteUrl).filter(StringUtils::isNotBlank));
.orElseGet(() -> Optional.ofNullable(absoluteUrl).filter(StringUtils::isNotBlank))
.orElseThrow(() -> new IllegalArgumentException("Unable to return non empty c2 url."));
}
private Optional<String> toAbsoluteUrl(String path) {

View File

@ -18,6 +18,7 @@
package org.apache.nifi.c2.client.http.url;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.Optional;
import java.util.stream.Stream;
@ -33,6 +34,9 @@ public class LegacyC2UrlProviderTest {
private static final String C2_HEARTBEAT_URL = "https://host:8080/c2/api/heartbeat";
private static final String C2_ACKNOWLEDGE_URL = "https://host:8080/c2/api/acknowledge";
private static final String ABSOLUTE_URL = "http://c2/api/callback";
private static final String RELATIVE_URL = "any_url";
private static final String EXPECTED_URL = "http://c2/api/callback";
@Test
public void testProviderIsCreatedAndReturnsProperHeartbeatAndAcknowledgeUrls() {
@ -44,9 +48,15 @@ public class LegacyC2UrlProviderTest {
@MethodSource("testCallbackUrlProvidedArguments")
@ParameterizedTest(name = "{index} => absoluteUrl={0}, relativeUrl={1}, expectedCallbackUrl={2}")
public void testCallbackUrlProvidedFor(String absoluteUrl, String relativeUrl, Optional<String> expectedCallbackUrl) {
public void testCallbackUrlProvidedForInvalidInputs(String absoluteUrl, String relativeUrl) {
LegacyC2UrlProvider testProvider = new LegacyC2UrlProvider(C2_HEARTBEAT_URL, C2_ACKNOWLEDGE_URL);
assertEquals(expectedCallbackUrl, testProvider.getCallbackUrl(absoluteUrl, relativeUrl));
assertThrows(IllegalArgumentException.class, () -> testProvider.getCallbackUrl(absoluteUrl, relativeUrl));
}
@Test
public void testCallbackUrlProvidedFor() {
LegacyC2UrlProvider testProvider = new LegacyC2UrlProvider(C2_HEARTBEAT_URL, C2_ACKNOWLEDGE_URL);
assertEquals(EXPECTED_URL, testProvider.getCallbackUrl(ABSOLUTE_URL, RELATIVE_URL));
}
private static Stream<Arguments> testCallbackUrlProvidedArguments() {
@ -54,8 +64,7 @@ public class LegacyC2UrlProviderTest {
Arguments.of(null, null, Optional.empty()),
Arguments.of(null, "any_url", Optional.empty()),
Arguments.of("", "", Optional.empty()),
Arguments.of("", "any_url", Optional.empty()),
Arguments.of("http://c2/api/callback", "any_url", Optional.of("http://c2/api/callback"))
Arguments.of("", "any_url", Optional.empty())
);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.nifi.c2.client.http.url;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import java.util.Optional;
@ -83,16 +84,36 @@ public class ProxyAwareC2ProviderTest {
@MethodSource("testCallbackUrlProvidedArguments")
@ParameterizedTest(name = "{index} => c2RestBase={0}, absoluteUrl={1}, relativeUrl={2}, expectedCallbackUrl={3}")
public void testCallbackUrlProvidedFor(String c2RestBase, String absoluteUrl, String relativeUrl, Optional<String> expectedCallbackUrl) {
public void testCallbackUrlProvidedForValidInputs(String c2RestBase, String absoluteUrl, String relativeUrl, String expectedCallbackUrl) {
ProxyAwareC2UrlProvider testProvider = new ProxyAwareC2UrlProvider(c2RestBase, "any_path", "any_path");
assertEquals(expectedCallbackUrl, testProvider.getCallbackUrl(absoluteUrl, relativeUrl));
}
@MethodSource("testCallbackUrlProvidedInvalidArguments")
@ParameterizedTest(name = "{index} => c2RestBase={0}, absoluteUrl={1}, relativeUrl={2}, expectedCallbackUrl={3}")
public void testCallbackUrlProvidedForInvalidInputs(String c2RestBase, String absoluteUrl, String relativeUrl) {
ProxyAwareC2UrlProvider testProvider = new ProxyAwareC2UrlProvider(c2RestBase, "any_path", "any_path");
assertThrows(IllegalArgumentException.class, () -> testProvider.getCallbackUrl(absoluteUrl, relativeUrl));
}
private static Stream<Arguments> testCallbackUrlProvidedArguments() {
String c2RestBaseNoTrailingSlash = "http://c2/api";
String c2RestBaseWithTrailingSlash = "http://c2/api/";
String path = "path/endpoint";
String absoluteUrl = "http://c2-other/api/path/endpoint";
return Stream.of(
Arguments.of(c2RestBaseNoTrailingSlash, null, path, c2RestBaseWithTrailingSlash + path),
Arguments.of(c2RestBaseNoTrailingSlash, "", "/" + path, c2RestBaseWithTrailingSlash + path),
Arguments.of(c2RestBaseWithTrailingSlash, null, path, c2RestBaseWithTrailingSlash + path),
Arguments.of(c2RestBaseWithTrailingSlash, "", "/" + path, c2RestBaseWithTrailingSlash + path),
Arguments.of(c2RestBaseWithTrailingSlash, absoluteUrl, null, absoluteUrl),
Arguments.of(c2RestBaseWithTrailingSlash, absoluteUrl, "", absoluteUrl)
);
}
private static Stream<Arguments> testCallbackUrlProvidedInvalidArguments() {
String c2RestBaseNoTrailingSlash = "http://c2/api";
String c2RestBaseWithTrailingSlash = "http://c2/api/";
return Stream.of(
Arguments.of(c2RestBaseNoTrailingSlash, null, null, Optional.empty()),
Arguments.of(c2RestBaseNoTrailingSlash, "", null, Optional.empty()),
@ -101,13 +122,7 @@ public class ProxyAwareC2ProviderTest {
Arguments.of(c2RestBaseWithTrailingSlash, null, null, Optional.empty()),
Arguments.of(c2RestBaseWithTrailingSlash, "", null, Optional.empty()),
Arguments.of(c2RestBaseWithTrailingSlash, null, "", Optional.empty()),
Arguments.of(c2RestBaseWithTrailingSlash, "", "", Optional.empty()),
Arguments.of(c2RestBaseNoTrailingSlash, null, path, Optional.of(c2RestBaseWithTrailingSlash + path)),
Arguments.of(c2RestBaseNoTrailingSlash, "", "/" + path, Optional.of(c2RestBaseWithTrailingSlash + path)),
Arguments.of(c2RestBaseWithTrailingSlash, null, path, Optional.of(c2RestBaseWithTrailingSlash + path)),
Arguments.of(c2RestBaseWithTrailingSlash, "", "/" + path, Optional.of(c2RestBaseWithTrailingSlash + path)),
Arguments.of(c2RestBaseWithTrailingSlash, absoluteUrl, null, Optional.of(absoluteUrl)),
Arguments.of(c2RestBaseWithTrailingSlash, absoluteUrl, "", Optional.of(absoluteUrl))
Arguments.of(c2RestBaseWithTrailingSlash, "", "", Optional.empty())
);
}
}

View File

@ -20,14 +20,18 @@ package org.apache.nifi.c2.client.service.operation;
import static java.util.Optional.ofNullable;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.apache.nifi.c2.protocol.api.FailureCause;
import org.apache.nifi.c2.protocol.api.OperandType;
import org.apache.nifi.c2.protocol.api.OperationType;
import org.apache.nifi.c2.serializer.C2Serializer;
import org.apache.nifi.minifi.validator.ValidationException;
/**
* Handler interface for the different operation types
@ -79,13 +83,38 @@ public interface C2OperationHandler {
* @param details additional status info to detail the state
* @return the created state
*/
default C2OperationState operationState(C2OperationState.OperationState operationState, String details) {
default C2OperationState operationState(C2OperationState.OperationState operationState, String details, Exception e) {
C2OperationState state = new C2OperationState();
state.setState(operationState);
state.setDetails(details);
ofNullable(e).map(this::toFailureCause).ifPresent(state::setFailureCause);
return state;
}
private FailureCause toFailureCause(Exception exception) {
FailureCause failureCause = new FailureCause();
failureCause.setExceptionMessage(exception.getMessage());
List<String> causeList = new LinkedList<>();
populateCausedChain(ofNullable(exception.getCause()), causeList);
failureCause.setCausedByMessages(causeList);
if (exception instanceof ValidationException validationException) {
failureCause.setValidationResults(validationException.getValidationResults());
}
return failureCause;
}
private List<String> populateCausedChain(Optional<Throwable> cause, List<String> causeList) {
cause.ifPresent(c -> {
causeList.add(c.getMessage());
populateCausedChain(cause.map(Throwable::getCause), causeList);
});
return causeList;
}
default C2OperationState operationState(C2OperationState.OperationState operationState, String details) {
return operationState(operationState, details, null);
}
/**
* Commonly used logic for creating an C2OperationAck object
*
@ -122,9 +151,10 @@ public interface C2OperationHandler {
* @param serializer the serializer used to converting to the target class
* @return the optional retrieved and converted argument value
*/
default <T> Optional<T> getOperationArg(C2Operation operation, String argument, TypeReference<T> type, C2Serializer serializer) {
default <T> T getOperationArg(C2Operation operation, String argument, TypeReference<T> type, C2Serializer serializer) {
return ofNullable(operation.getArgs())
.map(args -> args.get(argument))
.flatMap(arg -> serializer.convert(arg, type));
.flatMap(arg -> serializer.convert(arg, type))
.orElseThrow(() -> new IllegalArgumentException("Failed to parse argument " + argument + " of operation " + operation));
}
}

View File

@ -20,6 +20,7 @@ package org.apache.nifi.c2.client.service.operation;
import static java.util.Optional.ofNullable;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
import static org.apache.nifi.c2.protocol.api.OperandType.MANIFEST;
import static org.apache.nifi.c2.protocol.api.OperationType.DESCRIBE;
@ -33,12 +34,16 @@ import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.OperandType;
import org.apache.nifi.c2.protocol.api.OperationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DescribeManifestOperationHandler implements C2OperationHandler {
private static final String ERROR_MESSAGE = "Failed to execute manifest describe operation.";
private final C2HeartbeatFactory heartbeatFactory;
private final Supplier<RuntimeInfoWrapper> runtimeInfoSupplier;
private final OperandPropertiesProvider operandPropertiesProvider;
private static final Logger LOGGER = LoggerFactory.getLogger(DescribeManifestOperationHandler.class);
public DescribeManifestOperationHandler(C2HeartbeatFactory heartbeatFactory, Supplier<RuntimeInfoWrapper> runtimeInfoSupplier,
OperandPropertiesProvider operandPropertiesProvider) {
@ -60,15 +65,20 @@ public class DescribeManifestOperationHandler implements C2OperationHandler {
@Override
public C2OperationAck handle(C2Operation operation) {
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
C2OperationAck c2OperationAck;
try {
RuntimeInfoWrapper runtimeInfoWrapper = runtimeInfoSupplier.get();
C2Heartbeat heartbeat = heartbeatFactory.create(runtimeInfoWrapper);
RuntimeInfoWrapper runtimeInfoWrapper = runtimeInfoSupplier.get();
C2Heartbeat heartbeat = heartbeatFactory.create(runtimeInfoWrapper);
C2OperationAck c2OperationAck = operationAck(operationId, operationState(FULLY_APPLIED, EMPTY));
c2OperationAck.setAgentInfo(agentInfo(heartbeat, runtimeInfoWrapper));
c2OperationAck.setDeviceInfo(heartbeat.getDeviceInfo());
c2OperationAck.setFlowInfo(heartbeat.getFlowInfo());
c2OperationAck.setResourceInfo(heartbeat.getResourceInfo());
c2OperationAck = operationAck(operationId, operationState(FULLY_APPLIED, EMPTY));
c2OperationAck.setAgentInfo(agentInfo(heartbeat, runtimeInfoWrapper));
c2OperationAck.setDeviceInfo(heartbeat.getDeviceInfo());
c2OperationAck.setFlowInfo(heartbeat.getFlowInfo());
c2OperationAck.setResourceInfo(heartbeat.getResourceInfo());
} catch (Exception e) {
LOGGER.error(ERROR_MESSAGE, e);
c2OperationAck = operationAck(operationId, operationState(NOT_APPLIED, ERROR_MESSAGE, e));
}
return c2OperationAck;
}

View File

@ -27,7 +27,6 @@ import static org.apache.nifi.c2.util.Preconditions.requires;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
@ -89,20 +88,24 @@ public class SyncResourceOperationHandler implements C2OperationHandler {
public C2OperationAck handle(C2Operation operation) {
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
Optional<ResourcesGlobalHash> resourcesGlobalHash = getOperationArg(operation, GLOBAL_HASH_FIELD, new TypeReference<>() {
}, c2Serializer);
if (resourcesGlobalHash.isEmpty()) {
ResourcesGlobalHash resourcesGlobalHash;
try {
resourcesGlobalHash = getOperationArg(operation, GLOBAL_HASH_FIELD, new TypeReference<>() { }, c2Serializer);
} catch (Exception e) {
LOG.error("Resources global hash could not be constructed from C2 request");
return operationAck(operationId, operationState(NOT_APPLIED, "Resources global hash element was not found"));
}
Optional<List<ResourceItem>> resourceItems = getOperationArg(operation, RESOURCE_LIST_FIELD, new TypeReference<>() {
}, c2Serializer);
if (resourceItems.isEmpty()) {
LOG.error("Resource item list could not be constructed from C2 request");
return operationAck(operationId, operationState(NOT_APPLIED, "Resource item list element was not found"));
return operationAck(operationId, operationState(NOT_APPLIED, "Resources global hash element was not found", e));
}
OperationState operationState = syncResourceStrategy.synchronizeResourceRepository(resourcesGlobalHash.get(), resourceItems.get(), c2Client::retrieveResourceItem,
List<ResourceItem> resourceItems;
try {
resourceItems = getOperationArg(operation, RESOURCE_LIST_FIELD, new TypeReference<>() { }, c2Serializer);
} catch (Exception e) {
LOG.error("Resource item list could not be constructed from C2 request");
return operationAck(operationId, operationState(NOT_APPLIED, "Resource item list element was not found", e));
}
OperationState operationState = syncResourceStrategy.synchronizeResourceRepository(resourcesGlobalHash, resourceItems, c2Client::retrieveResourceItem,
relativeUrl -> c2Client.getCallbackUrl(null, relativeUrl));
C2OperationState resultState = operationState(
operationState,

View File

@ -31,5 +31,5 @@ public interface SyncResourceStrategy {
OperationState synchronizeResourceRepository(ResourcesGlobalHash resourcesGlobalHash, List<ResourceItem> c2ServerItems,
BiFunction<String, Function<InputStream, Optional<Path>>, Optional<Path>> resourceDownloadFunction,
Function<String, Optional<String>> urlEnrichFunction);
Function<String, String> urlEnrichFunction);
}

View File

@ -113,10 +113,12 @@ public class TransferDebugOperationHandler implements C2OperationHandler {
public C2OperationAck handle(C2Operation operation) {
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
Optional<String> callbackUrl = c2Client.getCallbackUrl(getOperationArg(operation, TARGET_ARG).orElse(EMPTY), getOperationArg(operation, RELATIVE_TARGET_ARG).orElse(EMPTY));
if (callbackUrl.isEmpty()) {
String callbackUrl;
try {
callbackUrl = c2Client.getCallbackUrl(getOperationArg(operation, TARGET_ARG).orElse(EMPTY), getOperationArg(operation, RELATIVE_TARGET_ARG).orElse(EMPTY));
} catch (Exception e) {
LOG.error("Callback URL could not be constructed from C2 request and current configuration");
return operationAck(operationId, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND));
return operationAck(operationId, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND, e));
}
List<Path> preparedFiles = null;
@ -124,7 +126,7 @@ public class TransferDebugOperationHandler implements C2OperationHandler {
try {
preparedFiles = prepareFiles(operationId, bundleFilePaths);
operationState = createDebugBundle(preparedFiles)
.map(bundle -> c2Client.uploadBundle(callbackUrl.get(), bundle)
.map(bundle -> c2Client.uploadBundle(callbackUrl, bundle)
.map(errorMessage -> operationState(NOT_APPLIED, errorMessage))
.orElseGet(() -> operationState(FULLY_APPLIED, SUCCESSFUL_UPLOAD)))
.orElseGet(() -> operationState(NOT_APPLIED, UNABLE_TO_CREATE_BUNDLE));

View File

@ -97,9 +97,11 @@ public class UpdateAssetOperationHandler implements C2OperationHandler {
public C2OperationAck handle(C2Operation operation) {
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
Optional<String> callbackUrl =
c2Client.getCallbackUrl(getOperationArg(operation, ASSET_URL_KEY).orElse(EMPTY), getOperationArg(operation, ASSET_RELATIVE_URL_KEY).orElse(EMPTY));
if (callbackUrl.isEmpty()) {
String callbackUrl;
try {
callbackUrl = c2Client.getCallbackUrl(getOperationArg(operation, ASSET_URL_KEY).orElse(EMPTY), getOperationArg(operation, ASSET_RELATIVE_URL_KEY).orElse(EMPTY));
} catch (Exception e) {
LOG.error("Callback URL could not be constructed from C2 request and current configuration");
return operationAck(operationId, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND));
}
@ -114,7 +116,7 @@ public class UpdateAssetOperationHandler implements C2OperationHandler {
LOG.info("Initiating asset update from url {} with name {}, force update is {}", callbackUrl, assetFileName, forceDownload);
C2OperationState operationState = assetUpdatePrecondition.test(assetFileName.get(), forceDownload)
? c2Client.retrieveUpdateAssetContent(callbackUrl.get())
? c2Client.retrieveUpdateAssetContent(callbackUrl)
.map(content -> assetPersistFunction.apply(assetFileName.get(), content)
? operationState(FULLY_APPLIED, SUCCESSFULLY_UPDATE_ASSET)
: operationState(NOT_APPLIED, FAILED_TO_PERSIST_ASSET_TO_DISK))

View File

@ -91,13 +91,15 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler {
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
String absoluteFlowUrl = getOperationArg(operation, FLOW_URL_KEY).orElse(getOperationArg(operation, LOCATION).orElse(EMPTY));
Optional<String> callbackUrl = client.getCallbackUrl(absoluteFlowUrl, getOperationArg(operation, FLOW_RELATIVE_URL_KEY).orElse(EMPTY));
if (callbackUrl.isEmpty()) {
String callbackUrl;
try {
callbackUrl = client.getCallbackUrl(absoluteFlowUrl, getOperationArg(operation, FLOW_RELATIVE_URL_KEY).orElse(EMPTY));
} catch (Exception e) {
logger.error("Callback URL could not be constructed from C2 request and current configuration");
return operationAck(operationId, operationState(NOT_APPLIED, "Could not get callback url from operation and current configuration"));
}
Optional<String> flowId = getFlowId(operation, callbackUrl.get());
Optional<String> flowId = getFlowId(operation, callbackUrl);
if (flowId.isEmpty()) {
logger.error("FlowId is missing, no update will be performed");
return operationAck(operationId, operationState(NOT_APPLIED, "Could not get flowId from the operation"));
@ -110,7 +112,7 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler {
logger.info("Will perform flow update from {} for operation #{}. Previous flow id was {}, replacing with new id {}",
callbackUrl, operationId, ofNullable(flowIdHolder.getFlowId()).orElse("not set"), flowId.get());
C2OperationState state = updateFlow(operationId, callbackUrl.get());
C2OperationState state = updateFlow(operationId, callbackUrl);
if (state.getState() == FULLY_APPLIED) {
flowIdHolder.setFlowId(flowId.get());
}
@ -125,9 +127,11 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler {
return operationState(NOT_APPLIED, "Update content retrieval resulted in empty content");
}
if (!updateConfigurationStrategy.update(updateContent.get())) {
try {
updateConfigurationStrategy.update(updateContent.get());
} catch (Exception e) {
logger.error("Update resulted in error for operation #{}.", opIdentifier);
return operationState(NOT_APPLIED, "Update resulted in error");
return operationState(NOT_APPLIED, "Update resulted in error:", e);
}
logger.debug("Update configuration applied for operation #{}.", opIdentifier);

View File

@ -27,7 +27,7 @@ public interface UpdateConfigurationStrategy {
* Updates the MiNiFi agent's flow with the flow passed as parameter
*
* @param flow the MiNiFi flow config JSON represented as a byte array
* @return true if the flow update was true, false otherwise
* @throw exception if update failed.
*/
boolean update(byte[] flow);
void update(byte[] flow);
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.validator;
import java.util.List;
import java.util.Objects;
import org.apache.nifi.components.ValidationResult;
public class ValidationException extends IllegalStateException {
private List<ValidationResult> validationResults;
public ValidationException(String message, List<ValidationResult> details) {
super(message);
this.validationResults = details;
}
public List<ValidationResult> getValidationResults() {
return validationResults;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ValidationException that = (ValidationException) o;
return Objects.equals(validationResults, that.validationResults);
}
@Override
public int hashCode() {
return Objects.hash(validationResults);
}
@Override
public String toString() {
return "ValidationException{" +
"validationResults=" + validationResults +
"message=" + this.getMessage() +
'}';
}
}

View File

@ -51,7 +51,6 @@ import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@ -139,7 +138,7 @@ public class TransferDebugOperationHandlerTest {
.collect(toList());
TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, createBundleFiles, DEFAULT_CONTENT_FILTER);
C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT);
when(c2Client.getCallbackUrl(any(), any())).thenReturn(Optional.of(C2_DEBUG_UPLOAD_ENDPOINT));
when(c2Client.getCallbackUrl(any(), any())).thenReturn(C2_DEBUG_UPLOAD_ENDPOINT);
// when
C2OperationAck result = testHandler.handle(c2Operation);
@ -200,7 +199,7 @@ public class TransferDebugOperationHandlerTest {
Predicate<String> testContentFilter = content -> !content.contains(filterKeyword);
TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, singletonList(bundleFile), testContentFilter);
C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT);
when(c2Client.getCallbackUrl(any(), any())).thenReturn(Optional.of(C2_DEBUG_UPLOAD_ENDPOINT));
when(c2Client.getCallbackUrl(any(), any())).thenReturn(C2_DEBUG_UPLOAD_ENDPOINT);
// when
C2OperationAck result = testHandler.handle(c2Operation);

View File

@ -94,7 +94,7 @@ public class UpdateAssetOperationHandlerTest {
@BeforeEach
public void setup() {
lenient().when(c2Client.getCallbackUrl(any(), any())).thenReturn(Optional.of(ASSET_URL));
lenient().when(c2Client.getCallbackUrl(any(), any())).thenReturn(ASSET_URL);
}
@ParameterizedTest(name = "c2Client={0} operandPropertiesProvider={1} bundleFileList={2} contentFilter={3}")
@ -115,7 +115,7 @@ public class UpdateAssetOperationHandlerTest {
public void testAssetUrlCanNotBeNull() {
// given
C2Operation operation = operation(null, ASSET_FILE_NAME, FORCE_DOWNLOAD);
when(c2Client.getCallbackUrl(any(), any())).thenReturn(Optional.empty());
when(c2Client.getCallbackUrl(any(), any())).thenThrow(new IllegalArgumentException());
// when
C2OperationAck result = testHandler.handle(operation);

View File

@ -22,6 +22,8 @@ import static org.apache.nifi.c2.client.service.operation.UpdateConfigurationOpe
import static org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler.LOCATION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -72,7 +74,7 @@ public class UpdateConfigurationOperationHandlerTest {
C2Operation operation = new C2Operation();
operation.setArgs(INCORRECT_LOCATION_MAP);
when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(INCORRECT_LOCATION));
when(client.getCallbackUrl(any(), any())).thenReturn(INCORRECT_LOCATION);
C2OperationAck response = handler.handle(operation);
@ -81,10 +83,10 @@ public class UpdateConfigurationOperationHandlerTest {
@Test
void testHandleFlowIdInArg() {
UpdateConfigurationStrategy successUpdate = flow -> true;
UpdateConfigurationStrategy successUpdate = mock(UpdateConfigurationStrategy.class);
when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
when(client.retrieveUpdateConfigurationContent(any())).thenReturn(Optional.of("content".getBytes()));
when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(INCORRECT_LOCATION));
when(client.getCallbackUrl(any(), any())).thenReturn(INCORRECT_LOCATION);
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, successUpdate, operandPropertiesProvider);
C2Operation operation = new C2Operation();
operation.setIdentifier(OPERATION_ID);
@ -103,7 +105,7 @@ public class UpdateConfigurationOperationHandlerTest {
@Test
void testHandleReturnsNoOperationWithNoContent() {
when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(CORRECT_LOCATION));
when(client.getCallbackUrl(any(), any())).thenReturn(CORRECT_LOCATION);
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, null, operandPropertiesProvider);
C2Operation operation = new C2Operation();
operation.setArgs(CORRECT_LOCATION_MAP);
@ -116,10 +118,11 @@ public class UpdateConfigurationOperationHandlerTest {
@Test
void testHandleReturnsNotAppliedWithContentApplyIssues() {
UpdateConfigurationStrategy failedToUpdate = flow -> false;
UpdateConfigurationStrategy failedToUpdate = mock(UpdateConfigurationStrategy.class);
doThrow(new IllegalStateException()).when(failedToUpdate).update(any());
when(flowIdHolder.getFlowId()).thenReturn("previous_flow_id");
when(client.retrieveUpdateConfigurationContent(any())).thenReturn(Optional.of("content".getBytes()));
when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(CORRECT_LOCATION));
when(client.getCallbackUrl(any(), any())).thenReturn(CORRECT_LOCATION);
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, failedToUpdate, operandPropertiesProvider);
C2Operation operation = new C2Operation();
operation.setIdentifier(OPERATION_ID);
@ -133,9 +136,9 @@ public class UpdateConfigurationOperationHandlerTest {
@Test
void testHandleReturnsFullyApplied() {
UpdateConfigurationStrategy successUpdate = flow -> true;
UpdateConfigurationStrategy successUpdate = mock(UpdateConfigurationStrategy.class);
when(flowIdHolder.getFlowId()).thenReturn("previous_flow_id");
when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(CORRECT_LOCATION));
when(client.getCallbackUrl(any(), any())).thenReturn(CORRECT_LOCATION);
when(client.retrieveUpdateConfigurationContent(any())).thenReturn(Optional.of("content".getBytes()));
UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, successUpdate, operandPropertiesProvider);
C2Operation operation = new C2Operation();

View File

@ -42,6 +42,17 @@ public class C2OperationState implements Serializable {
@Schema(description = "Additional details about the state")
private String details;
@Schema(description = "Additional details about the cause of the failure")
private FailureCause failureCause;
public FailureCause getFailureCause() {
return failureCause;
}
public void setFailureCause(FailureCause failureCause) {
this.failureCause = failureCause;
}
public String getDetails() {
return details;
}
@ -76,12 +87,12 @@ public class C2OperationState implements Serializable {
return false;
}
C2OperationState that = (C2OperationState) o;
return state == that.state && Objects.equals(details, that.details);
return state == that.state && Objects.equals(details, that.details) && Objects.equals(failureCause, that.failureCause);
}
@Override
public int hashCode() {
return Objects.hash(state, details);
return Objects.hash(state, details, failureCause);
}
@Override
@ -89,6 +100,7 @@ public class C2OperationState implements Serializable {
return "C2OperationState{" +
"state=" + state +
", details='" + details + '\'' +
", failureCause='" + failureCause + '\'' +
'}';
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.protocol.api;
import java.util.List;
import java.util.Objects;
import org.apache.nifi.components.ValidationResult;
public class FailureCause {
private List<ValidationResult> validationResults;
private String exceptionMessage;
private List<String> causedByMessages;
public List<ValidationResult> getValidationResults() {
return validationResults;
}
public void setValidationResults(List<ValidationResult> validationResults) {
this.validationResults = validationResults;
}
public String getExceptionMessage() {
return exceptionMessage;
}
public void setExceptionMessage(String exceptionMessage) {
this.exceptionMessage = exceptionMessage;
}
public List<String> getCausedByMessages() {
return causedByMessages;
}
public void setCausedByMessages(List<String> causedByMessages) {
this.causedByMessages = causedByMessages;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FailureCause that = (FailureCause) o;
return Objects.equals(validationResults, that.validationResults) && Objects.equals(exceptionMessage, that.exceptionMessage) && Objects.equals(causedByMessages, that.causedByMessages);
}
@Override
public int hashCode() {
return Objects.hash(validationResults, exceptionMessage, causedByMessages);
}
}

View File

@ -54,6 +54,7 @@ import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.minifi.commons.service.FlowEnrichService;
import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.FlowSerDeService;
import org.apache.nifi.minifi.validator.ValidationException;
import org.apache.nifi.services.FlowService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -91,7 +92,7 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
}
@Override
public boolean update(byte[] rawFlow) {
public void update(byte[] rawFlow) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Attempting to update flow with content: \n{}", new String(rawFlow, UTF_8));
}
@ -116,22 +117,24 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
reloadFlow(findAllProposedConnectionIds(enrichedFlowCandidate.getRootGroup()));
return true;
} catch (IllegalStateException e) {
LOGGER.error("Configuration update failed. Reverting and reloading previous flow", e);
revert(backupFlowConfigurationFile, flowConfigurationFile);
revert(backupRawFlowConfigurationFile, rawFlowConfigurationFile);
try {
reloadFlow(originalConnectionIds);
} catch (IOException ex) {
LOGGER.error("Unable to reload the reverted flow", e);
} catch (ValidationException ex) {
LOGGER.error("Unable to reload the reverted flow", ex);
throw ex;
} catch (Exception exception) {
throw new RuntimeException(exception);
}
return false;
throw e;
} catch (Exception e) {
LOGGER.error("Configuration update failed. Reverting to previous flow, no reload is necessary", e);
revert(backupFlowConfigurationFile, flowConfigurationFile);
revert(backupRawFlowConfigurationFile, rawFlowConfigurationFile);
return false;
throw new RuntimeException(e);
} finally {
removeIfExists(backupFlowConfigurationFile);
removeIfExists(backupRawFlowConfigurationFile);
@ -148,7 +151,7 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
List<ValidationResult> validationErrors = validate(flowController.getFlowManager());
if (!validationErrors.isEmpty()) {
LOGGER.error("Validation errors found when reloading the flow: {}", validationErrors);
throw new IllegalStateException("Unable to start flow due to validation errors");
throw new ValidationException("Unable to start flow due to validation errors", validationErrors);
}
flowController.getFlowManager().getRootGroup().startProcessing();

View File

@ -73,7 +73,7 @@ public class DefaultSyncResourceStrategy implements SyncResourceStrategy {
@Override
public OperationState synchronizeResourceRepository(ResourcesGlobalHash c2GlobalHash, List<ResourceItem> c2ServerItems,
BiFunction<String, Function<InputStream, Optional<Path>>, Optional<Path>> resourceDownloadFunction,
Function<String, Optional<String>> urlEnrichFunction) {
Function<String, String> urlEnrichFunction) {
Set<ResourceItem> c2Items = Set.copyOf(c2ServerItems);
Set<ResourceItem> agentItems = Set.copyOf(resourceRepository.findAllResourceItems());
@ -89,7 +89,7 @@ public class DefaultSyncResourceStrategy implements SyncResourceStrategy {
private OperationState saveNewItems(Set<ResourceItem> c2Items, Set<ResourceItem> agentItems,
BiFunction<String, Function<InputStream, Optional<Path>>, Optional<Path>> resourceDownloadFunction,
Function<String, Optional<String>> urlEnrichFunction) {
Function<String, String> urlEnrichFunction) {
List<ResourceItem> newItems = c2Items.stream().filter(not(agentItems::contains)).toList();
if (newItems.isEmpty()) {
return NO_OPERATION;
@ -107,10 +107,10 @@ public class DefaultSyncResourceStrategy implements SyncResourceStrategy {
}
private Function<ResourceItem, Optional<ResourceItem>> downloadIfNotPresentAndAddToRepository(
BiFunction<String, Function<InputStream, Optional<Path>>, Optional<Path>> resourceDownloadFunction, Function<String, Optional<String>> urlEnrichFunction) {
BiFunction<String, Function<InputStream, Optional<Path>>, Optional<Path>> resourceDownloadFunction, Function<String, String> urlEnrichFunction) {
return resourceItem -> resourceRepository.resourceItemBinaryPresent(resourceItem)
? resourceRepository.addResourceItem(resourceItem)
: urlEnrichFunction.apply(resourceItem.getUrl())
: Optional.ofNullable(urlEnrichFunction.apply(resourceItem.getUrl()))
.flatMap(enrichedUrl -> resourceDownloadFunction.apply(enrichedUrl, this::persistToTemporaryLocation))
.flatMap(tempResourcePath -> resourceRepository.addResourceItem(resourceItem, tempResourcePath));
}

View File

@ -127,10 +127,9 @@ public class DefaultUpdateConfigurationStrategyTest {
when(mockFlowManager.getRootGroup()).thenReturn(mockProcessGroup);
// when
boolean result = testUpdateConfigurationStrategy.update(NEW_RAW_FLOW_CONFIG_CONTENT);
testUpdateConfigurationStrategy.update(NEW_RAW_FLOW_CONFIG_CONTENT);
//then
assertTrue(result);
assertTrue(exists(flowConfigurationFile));
assertTrue(exists(rawFlowConfigurationFile));
assertArrayEquals(NEW_ENRICHED_FLOW_CONFIG_CONTENT, readGzipFile(flowConfigurationFile));
@ -155,19 +154,20 @@ public class DefaultUpdateConfigurationStrategyTest {
doThrow(new IOException()).when(mockFlowService).load(null);
// when
boolean result = testUpdateConfigurationStrategy.update(NEW_RAW_FLOW_CONFIG_CONTENT);
//then
assertFalse(result);
assertTrue(exists(flowConfigurationFile));
assertTrue(exists(rawFlowConfigurationFile));
assertArrayEquals(ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT, readGzipFile(flowConfigurationFile));
assertArrayEquals(ORIGINAL_RAW_FLOW_CONFIG_CONTENT, readPlainTextFile(rawFlowConfigurationFile));
assertFalse(exists(backupFlowConfigurationFile));
assertFalse(exists(backupRawFlowConfigurationFile));
verify(mockFlowService, times(1)).load(null);
verify(mockFlowController, times(0)).onFlowInitialized(true);
verify(mockProcessGroup, times(0)).startProcessing();
try {
testUpdateConfigurationStrategy.update(NEW_RAW_FLOW_CONFIG_CONTENT);
} catch (Exception e) {
//then
assertTrue(exists(flowConfigurationFile));
assertTrue(exists(rawFlowConfigurationFile));
assertArrayEquals(ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT, readGzipFile(flowConfigurationFile));
assertArrayEquals(ORIGINAL_RAW_FLOW_CONFIG_CONTENT, readPlainTextFile(rawFlowConfigurationFile));
assertFalse(exists(backupFlowConfigurationFile));
assertFalse(exists(backupRawFlowConfigurationFile));
verify(mockFlowService, times(1)).load(null);
verify(mockFlowController, times(0)).onFlowInitialized(true);
verify(mockProcessGroup, times(0)).startProcessing();
}
}
private void writeGzipFile(Path path, byte[] content) throws IOException {

View File

@ -70,7 +70,7 @@ public class DefaultSyncResourceStrategyTest {
(url, persistFunction) -> url.endsWith(FAIL_DOWNLOAD_URL) ? empty() : persistFunction.apply(new ByteArrayInputStream(url.getBytes()));
private static String ENRICH_PREFIX = "pre_";
private static final Function<String, Optional<String>> PREFIXING_ENRICH_FUNCTION = url -> ofNullable(url).map(arg -> ENRICH_PREFIX + arg);
private static final Function<String, String> PREFIXING_ENRICH_FUNCTION = url -> ofNullable(url).map(arg -> ENRICH_PREFIX + arg).orElse("");
@Mock
private ResourceRepository mockResourceRepository;