From 65558accda6e368179d7761e4a943f9543f75c71 Mon Sep 17 00:00:00 2001 From: Ferenc Kis Date: Mon, 24 Oct 2022 20:04:51 +0200 Subject: [PATCH] NIFI-10679 MiNiFi C2 Update Asset Command Signed-off-by: Csaba Bejan This closes #6583. --- .../apache/nifi/c2/client/C2ClientConfig.java | 12 + ...ava => TransferDebugOperationHandler.java} | 25 ++- .../UpdateAssetOperationHandler.java | 150 +++++++++++++ ...=> TransferDebugOperationHandlerTest.java} | 20 +- .../UpdateAssetOperationHandlerTest.java | 212 ++++++++++++++++++ .../c2/protocol/api/C2OperationState.java | 7 +- .../nifi/c2/protocol/api/OperandType.java | 3 +- .../nifi/c2/protocol/api/OperationType.java | 3 +- .../src/main/resources/conf/bootstrap.conf | 4 +- .../org/apache/nifi/c2/C2NiFiProperties.java | 3 + .../apache/nifi/c2/C2NifiClientService.java | 61 ++--- .../command/TransferDebugCommandHelper.java | 70 ++++++ .../c2/command/UpdateAssetCommandHelper.java | 69 ++++++ .../TransferDebugCommandHelperTest.java} | 18 +- .../command/UpdateAssetCommandHelperTest.java | 142 ++++++++++++ 15 files changed, 725 insertions(+), 74 deletions(-) rename c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/{DebugOperationHandler.java => TransferDebugOperationHandler.java} (89%) create mode 100644 c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandler.java rename c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/{DebugOperationHandlerTest.java => TransferDebugOperationHandlerTest.java} (89%) create mode 100644 c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandlerTest.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/command/TransferDebugCommandHelper.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/command/UpdateAssetCommandHelper.java rename nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/{C2NifiClientServiceTest.java => command/TransferDebugCommandHelperTest.java} (81%) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/command/UpdateAssetCommandHelperTest.java diff --git a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java index 90f14e8037..2d823320c3 100644 --- a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java +++ b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java @@ -41,6 +41,7 @@ public class C2ClientConfig { private final long readTimeout; private final long connectTimeout; private final String c2RequestCompression; + private final String c2AssetDirectory; private C2ClientConfig(final Builder builder) { this.c2Url = builder.c2Url; @@ -63,6 +64,7 @@ public class C2ClientConfig { this.readTimeout = builder.readTimeout; this.connectTimeout = builder.connectTimeout; this.c2RequestCompression = builder.c2RequestCompression; + this.c2AssetDirectory = builder.c2AssetDirectory; } public String getC2Url() { @@ -145,6 +147,10 @@ public class C2ClientConfig { return c2RequestCompression; } + public String getC2AssetDirectory() { + return c2AssetDirectory; + } + /** * Builder for client configuration. */ @@ -170,6 +176,7 @@ public class C2ClientConfig { private long readTimeout; private long connectTimeout; private String c2RequestCompression; + private String c2AssetDirectory; public Builder c2Url(final String c2Url) { this.c2Url = c2Url; @@ -271,6 +278,11 @@ public class C2ClientConfig { return this; } + public Builder c2AssetDirectory(final String c2AssetDirectory) { + this.c2AssetDirectory = c2AssetDirectory; + return this; + } + public C2ClientConfig build() { return new C2ClientConfig(this); } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandler.java similarity index 89% rename from c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandler.java rename to c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandler.java index 22b9cd18b4..ef73bbc8d7 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandler.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandler.java @@ -21,6 +21,7 @@ import static java.nio.file.Files.copy; import static java.nio.file.Files.createTempDirectory; import static java.nio.file.Files.lines; import static java.nio.file.Files.walk; +import static java.util.Collections.emptyMap; import static java.util.Optional.empty; import static java.util.Optional.ofNullable; import static org.apache.commons.compress.utils.IOUtils.closeQuietly; @@ -57,9 +58,9 @@ import org.apache.nifi.c2.protocol.api.OperationType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DebugOperationHandler implements C2OperationHandler { +public class TransferDebugOperationHandler implements C2OperationHandler { - private static final Logger LOG = LoggerFactory.getLogger(DebugOperationHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(TransferDebugOperationHandler.class); private static final String C2_CALLBACK_URL_NOT_FOUND = "C2 Server callback URL was not found in request"; private static final String SUCCESSFUL_UPLOAD = "Debug bundle was uploaded successfully"; @@ -69,31 +70,33 @@ public class DebugOperationHandler implements C2OperationHandler { static final String NEW_LINE = "\n"; private final C2Client c2Client; + private final OperandPropertiesProvider operandPropertiesProvider; private final List bundleFilePaths; private final Predicate contentFilter; - private final OperandPropertiesProvider operandPropertiesProvider; - private DebugOperationHandler(C2Client c2Client, List bundleFilePaths, Predicate contentFilter, - OperandPropertiesProvider operandPropertiesProvider) { + private TransferDebugOperationHandler(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider, + List bundleFilePaths, Predicate contentFilter) { this.c2Client = c2Client; + this.operandPropertiesProvider = operandPropertiesProvider; this.bundleFilePaths = bundleFilePaths; this.contentFilter = contentFilter; - this.operandPropertiesProvider = operandPropertiesProvider; } - public static DebugOperationHandler create(C2Client c2Client, List bundleFilePaths, Predicate contentFilter, - OperandPropertiesProvider operandPropertiesProvider) { + public static TransferDebugOperationHandler create(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider, + List bundleFilePaths, Predicate contentFilter) { if (c2Client == null) { throw new IllegalArgumentException("C2Client should not be null"); } + if (operandPropertiesProvider == null) { + throw new IllegalArgumentException("OperandPropertiesProvider should not be not null"); + } if (bundleFilePaths == null || bundleFilePaths.isEmpty()) { throw new IllegalArgumentException("bundleFilePaths should not be not null or empty"); } if (contentFilter == null) { throw new IllegalArgumentException("Content filter should not be null"); } - - return new DebugOperationHandler(c2Client, bundleFilePaths, contentFilter, operandPropertiesProvider); + return new TransferDebugOperationHandler(c2Client, operandPropertiesProvider, bundleFilePaths, contentFilter); } @Override @@ -113,7 +116,7 @@ public class DebugOperationHandler implements C2OperationHandler { @Override public C2OperationAck handle(C2Operation operation) { - String debugCallbackUrl = operation.getArgs().get(TARGET_ARG); + String debugCallbackUrl = ofNullable(operation.getArgs()).orElse(emptyMap()).get(TARGET_ARG); if (debugCallbackUrl == null) { LOG.error("Callback URL was not found in C2 request."); return operationAck(operation, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND)); diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandler.java new file mode 100644 index 0000000000..b73e66cf32 --- /dev/null +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandler.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.service.operation; + +import static java.lang.Boolean.parseBoolean; +import static java.util.Collections.emptyMap; +import static java.util.Optional.ofNullable; +import static org.apache.commons.lang3.StringUtils.EMPTY; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION; +import static org.apache.nifi.c2.protocol.api.OperandType.ASSET; +import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE; + +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.BiPredicate; +import org.apache.nifi.c2.client.api.C2Client; +import org.apache.nifi.c2.protocol.api.C2Operation; +import org.apache.nifi.c2.protocol.api.C2OperationAck; +import org.apache.nifi.c2.protocol.api.C2OperationState; +import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState; +import org.apache.nifi.c2.protocol.api.OperandType; +import org.apache.nifi.c2.protocol.api.OperationType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UpdateAssetOperationHandler implements C2OperationHandler { + + static final String ASSET_URL_KEY = "url"; + static final String ASSET_FILE_KEY = "file"; + static final String ASSET_FORCE_DOWNLOAD_KEY = "forceDownload"; + + static final String C2_CALLBACK_URL_NOT_FOUND = "C2 Server callback URL was not found in request"; + static final String ASSET_FILE_NAME_NOT_FOUND = "Asset file name was not found in request"; + static final String SUCCESSFULLY_UPDATE_ASSET = "Successfully update asset"; + static final String FAILED_TO_PERSIST_ASSET_TO_DISK = "Failed to persist asset to disk"; + static final String UPDATE_ASSET_RETRIEVAL_RESULTED_IN_EMPTY_CONTENT = "Update asset retrieval resulted in empty content"; + static final String UPDATE_ASSET_PRECONDITIONS_WERE_NOT_MET = "Update asset preconditions were not met. No update will be performed"; + + private static final Logger LOG = LoggerFactory.getLogger(UpdateAssetOperationHandler.class); + + private final C2Client client; + private final OperandPropertiesProvider operandPropertiesProvider; + private final BiPredicate assetUpdatePrecondition; + private final BiFunction assetPersistFunction; + + public UpdateAssetOperationHandler(C2Client client, OperandPropertiesProvider operandPropertiesProvider, + BiPredicate assetUpdatePrecondition, BiFunction assetPersistFunction) { + this.client = client; + this.operandPropertiesProvider = operandPropertiesProvider; + this.assetUpdatePrecondition = assetUpdatePrecondition; + this.assetPersistFunction = assetPersistFunction; + } + + public static UpdateAssetOperationHandler create(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider, + BiPredicate assetUpdatePrecondition, BiFunction assetPersistFunction) { + if (c2Client == null) { + throw new IllegalArgumentException("C2Client should not be null"); + } + if (operandPropertiesProvider == null) { + throw new IllegalArgumentException("OperandPropertiesProvider should not be not null"); + } + if (assetUpdatePrecondition == null) { + throw new IllegalArgumentException("Asset update precondition should not be null"); + } + if (assetPersistFunction == null) { + throw new IllegalArgumentException("Asset persist function should not be null"); + } + + return new UpdateAssetOperationHandler(c2Client, operandPropertiesProvider, assetUpdatePrecondition, assetPersistFunction); + } + + @Override + public OperationType getOperationType() { + return UPDATE; + } + + @Override + public OperandType getOperandType() { + return ASSET; + } + + @Override + public Map getProperties() { + return operandPropertiesProvider.getProperties(); + } + + @Override + public C2OperationAck handle(C2Operation operation) { + String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY); + + String assetUrl = getOperationArg(operation, ASSET_URL_KEY); + if (assetUrl == null) { + LOG.error("Callback URL with key={} was not found in C2 request. C2 request arguments={}", ASSET_URL_KEY, operation.getArgs()); + return operationAck(operationId, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND)); + } + String assetFileName = getOperationArg(operation, ASSET_FILE_KEY); + if (assetFileName == null) { + LOG.error("Asset file name with key={} was not found in C2 request. C2 request arguments={}", ASSET_FILE_KEY, operation.getArgs()); + return operationAck(operationId, operationState(NOT_APPLIED, ASSET_FILE_NAME_NOT_FOUND)); + } + boolean forceDownload = parseBoolean(getOperationArg(operation, ASSET_FORCE_DOWNLOAD_KEY)); + + LOG.info("Initiating asset update from url {} with name {}, force update is {}", assetUrl, assetFileName, forceDownload); + + C2OperationState operationState = assetUpdatePrecondition.test(assetFileName, forceDownload) + ? client.retrieveUpdateContent(assetUrl) + .map(content -> assetPersistFunction.apply(assetFileName, content) + ? operationState(FULLY_APPLIED, SUCCESSFULLY_UPDATE_ASSET) + : operationState(NOT_APPLIED, FAILED_TO_PERSIST_ASSET_TO_DISK)) + .orElseGet(() -> operationState(NOT_APPLIED, UPDATE_ASSET_RETRIEVAL_RESULTED_IN_EMPTY_CONTENT)) + : operationState(NO_OPERATION, UPDATE_ASSET_PRECONDITIONS_WERE_NOT_MET); + + return operationAck(operationId, operationState); + } + + private String getOperationArg(C2Operation operation, String argument) { + return ofNullable(operation.getArgs()).orElse(emptyMap()).get(argument); + } + + private C2OperationState operationState(OperationState operationState, String details) { + C2OperationState state = new C2OperationState(); + state.setState(operationState); + state.setDetails(details); + return state; + } + + private C2OperationAck operationAck(String operationId, C2OperationState operationState) { + C2OperationAck operationAck = new C2OperationAck(); + operationAck.setOperationState(operationState); + operationAck.setOperationId(operationId); + return operationAck; + } +} diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandlerTest.java similarity index 89% rename from c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandlerTest.java rename to c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandlerTest.java index 31ff4cf35b..7973f35fda 100644 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandlerTest.java +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandlerTest.java @@ -26,8 +26,8 @@ import static java.util.function.Function.identity; import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; -import static org.apache.nifi.c2.client.service.operation.DebugOperationHandler.NEW_LINE; -import static org.apache.nifi.c2.client.service.operation.DebugOperationHandler.TARGET_ARG; +import static org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler.NEW_LINE; +import static org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler.TARGET_ARG; import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED; import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED; import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG; @@ -68,7 +68,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) -public class DebugOperationHandlerTest { +public class TransferDebugOperationHandlerTest { private static final String OPERATION_ID = "operationId"; private static final String C2_DEBUG_UPLOAD_ENDPOINT = "https://host/c2/api/upload"; @@ -99,13 +99,13 @@ public class DebugOperationHandlerTest { @ParameterizedTest(name = "c2Client={0} bundleFileList={1} contentFilter={2}") @MethodSource("invalidConstructorArguments") public void testAttemptingCreateWithInvalidParametersWillThrowException(C2Client c2Client, List bundleFilePaths, Predicate contentFilter) { - assertThrows(IllegalArgumentException.class, () -> DebugOperationHandler.create(c2Client, bundleFilePaths, contentFilter, operandPropertiesProvider)); + assertThrows(IllegalArgumentException.class, () -> TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, bundleFilePaths, contentFilter)); } @Test public void testOperationAndOperandTypesAreMatching() { // given - DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER, operandPropertiesProvider); + TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER); // when + then assertEquals(TRANSFER, testHandler.getOperationType()); @@ -115,7 +115,7 @@ public class DebugOperationHandlerTest { @Test public void testC2CallbackUrlIsNullInArgs() { // given - DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER, operandPropertiesProvider); + TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER); C2Operation c2Operation = operation(null); // when @@ -134,7 +134,7 @@ public class DebugOperationHandlerTest { List createBundleFiles = bundleFileNamesWithContents.entrySet().stream() .map(entry -> placeFileWithContent(entry.getKey(), entry.getValue())) .collect(toList()); - DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, createBundleFiles, DEFAULT_CONTENT_FILTER, operandPropertiesProvider); + TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, createBundleFiles, DEFAULT_CONTENT_FILTER); C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT); // when @@ -154,8 +154,8 @@ public class DebugOperationHandlerTest { @Test public void testFileToCollectDoesNotExist() { // given - DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, singletonList(Paths.get(tempDir.getAbsolutePath(), "missing_file")), - DEFAULT_CONTENT_FILTER, operandPropertiesProvider); + TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, + singletonList(Paths.get(tempDir.getAbsolutePath(), "missing_file")), DEFAULT_CONTENT_FILTER); C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT); // when @@ -194,7 +194,7 @@ public class DebugOperationHandlerTest { // given Path bundleFile = placeFileWithContent(fileName, inputContent); Predicate testContentFilter = content -> !content.contains(filterKeyword); - DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, singletonList(bundleFile), testContentFilter, operandPropertiesProvider); + TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, singletonList(bundleFile), testContentFilter); C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT); // when diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandlerTest.java new file mode 100644 index 0000000000..b5519abf8d --- /dev/null +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandlerTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.service.operation; + +import static java.lang.Boolean.FALSE; +import static java.lang.Boolean.TRUE; +import static java.lang.Boolean.parseBoolean; +import static java.util.Optional.empty; +import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.ASSET_FILE_KEY; +import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.ASSET_FILE_NAME_NOT_FOUND; +import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.ASSET_FORCE_DOWNLOAD_KEY; +import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.ASSET_URL_KEY; +import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.C2_CALLBACK_URL_NOT_FOUND; +import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.FAILED_TO_PERSIST_ASSET_TO_DISK; +import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.SUCCESSFULLY_UPDATE_ASSET; +import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.UPDATE_ASSET_PRECONDITIONS_WERE_NOT_MET; +import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.UPDATE_ASSET_RETRIEVAL_RESULTED_IN_EMPTY_CONTENT; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION; +import static org.apache.nifi.c2.protocol.api.OperandType.ASSET; +import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.function.BiPredicate; +import java.util.stream.Stream; +import org.apache.nifi.c2.client.api.C2Client; +import org.apache.nifi.c2.protocol.api.C2Operation; +import org.apache.nifi.c2.protocol.api.C2OperationAck; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class UpdateAssetOperationHandlerTest { + + private static final String OPERATION_ID = "operationId"; + private static final String ASSET_URL = "http://c2.server/asset/1"; + private static final String ASSET_FILE_NAME = "asset.file"; + private static final String FORCE_DOWNLOAD = "true"; + + @Mock + private C2Client c2Client; + + @Mock + private OperandPropertiesProvider operandPropertiesProvider; + + @Mock + private BiPredicate assetUpdatePrecondition; + + @Mock + private BiFunction assetPersistFunction; + + @InjectMocks + private UpdateAssetOperationHandler testHandler; + + private static Stream invalidConstructorArguments() { + return Stream.of( + Arguments.of(null, null, null, null), + Arguments.of(mock(C2Client.class), null, null, null), + Arguments.of(mock(C2Client.class), mock(OperandPropertiesProvider.class), null, null), + Arguments.of(mock(C2Client.class), mock(OperandPropertiesProvider.class), mock(BiPredicate.class), null)); + } + + @ParameterizedTest(name = "c2Client={0} operandPropertiesProvider={1} bundleFileList={2} contentFilter={3}") + @MethodSource("invalidConstructorArguments") + public void testAttemptingCreateWithInvalidParametersWillThrowException(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider, + BiPredicate assetUpdatePrecondition, + BiFunction assetPersistFunction) { + assertThrows(IllegalArgumentException.class, () -> UpdateAssetOperationHandler.create(c2Client, operandPropertiesProvider, assetUpdatePrecondition, assetPersistFunction)); + } + + @Test + public void testOperationAndOperandTypesAreMatching() { + assertEquals(UPDATE, testHandler.getOperationType()); + assertEquals(ASSET, testHandler.getOperandType()); + } + + @Test + public void testAssetUrlCanNotBeNull() { + // given + C2Operation operation = operation(null, ASSET_FILE_NAME, FORCE_DOWNLOAD); + + // when + C2OperationAck result = testHandler.handle(operation); + + // then + assertEquals(OPERATION_ID, result.getOperationId()); + assertEquals(NOT_APPLIED, result.getOperationState().getState()); + assertEquals(C2_CALLBACK_URL_NOT_FOUND, result.getOperationState().getDetails()); + } + + @Test + public void testAssetFileNameCanNotBeNull() { + // given + C2Operation operation = operation(ASSET_URL, null, FORCE_DOWNLOAD); + + // when + C2OperationAck result = testHandler.handle(operation); + + // then + assertEquals(OPERATION_ID, result.getOperationId()); + assertEquals(NOT_APPLIED, result.getOperationState().getState()); + assertEquals(ASSET_FILE_NAME_NOT_FOUND, result.getOperationState().getDetails()); + } + + @Test + public void testAssetPreconditionIsNotMet() { + // given + C2Operation operation = operation(ASSET_URL, ASSET_FILE_NAME, FORCE_DOWNLOAD); + when(assetUpdatePrecondition.test(ASSET_FILE_NAME, parseBoolean(FORCE_DOWNLOAD))).thenReturn(FALSE); + + // when + C2OperationAck result = testHandler.handle(operation); + + // then + assertEquals(OPERATION_ID, result.getOperationId()); + assertEquals(NO_OPERATION, result.getOperationState().getState()); + assertEquals(UPDATE_ASSET_PRECONDITIONS_WERE_NOT_MET, result.getOperationState().getDetails()); + } + + @Test + public void testAssetRetrieveReturnsEmptyResult() { + // given + C2Operation operation = operation(ASSET_URL, ASSET_FILE_NAME, FORCE_DOWNLOAD); + when(assetUpdatePrecondition.test(ASSET_FILE_NAME, parseBoolean(FORCE_DOWNLOAD))).thenReturn(TRUE); + when(c2Client.retrieveUpdateContent(ASSET_URL)).thenReturn(empty()); + + // when + C2OperationAck result = testHandler.handle(operation); + + // then + assertEquals(OPERATION_ID, result.getOperationId()); + assertEquals(NOT_APPLIED, result.getOperationState().getState()); + assertEquals(UPDATE_ASSET_RETRIEVAL_RESULTED_IN_EMPTY_CONTENT, result.getOperationState().getDetails()); + } + + @Test + public void testAssetPersistFunctionFails() { + // given + C2Operation operation = operation(ASSET_URL, ASSET_FILE_NAME, FORCE_DOWNLOAD); + when(assetUpdatePrecondition.test(ASSET_FILE_NAME, parseBoolean(FORCE_DOWNLOAD))).thenReturn(TRUE); + byte[] mockUpdateContent = new byte[0]; + when(c2Client.retrieveUpdateContent(ASSET_URL)).thenReturn(Optional.of(mockUpdateContent)); + when(assetPersistFunction.apply(ASSET_FILE_NAME, mockUpdateContent)).thenReturn(FALSE); + + // when + C2OperationAck result = testHandler.handle(operation); + + // then + assertEquals(OPERATION_ID, result.getOperationId()); + assertEquals(NOT_APPLIED, result.getOperationState().getState()); + assertEquals(FAILED_TO_PERSIST_ASSET_TO_DISK, result.getOperationState().getDetails()); + } + + @Test + public void testAssetIsDownloadedAndPersistedSuccessfully() { + // given + C2Operation operation = operation(ASSET_URL, ASSET_FILE_NAME, FORCE_DOWNLOAD); + when(assetUpdatePrecondition.test(ASSET_FILE_NAME, parseBoolean(FORCE_DOWNLOAD))).thenReturn(TRUE); + byte[] mockUpdateContent = new byte[0]; + when(c2Client.retrieveUpdateContent(ASSET_URL)).thenReturn(Optional.of(mockUpdateContent)); + when(assetPersistFunction.apply(ASSET_FILE_NAME, mockUpdateContent)).thenReturn(TRUE); + + // when + C2OperationAck result = testHandler.handle(operation); + + // then + assertEquals(OPERATION_ID, result.getOperationId()); + assertEquals(FULLY_APPLIED, result.getOperationState().getState()); + assertEquals(SUCCESSFULLY_UPDATE_ASSET, result.getOperationState().getDetails()); + } + + private C2Operation operation(String assetUrl, String assetFile, String forceDownload) { + C2Operation c2Operation = new C2Operation(); + c2Operation.setIdentifier(OPERATION_ID); + + Map arguments = new HashMap<>(); + arguments.put(ASSET_URL_KEY, assetUrl); + arguments.put(ASSET_FILE_KEY, assetFile); + arguments.put(ASSET_FORCE_DOWNLOAD_KEY, forceDownload); + c2Operation.setArgs(arguments); + return c2Operation; + } +} diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationState.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationState.java index 219b7edce3..21a44b582c 100644 --- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationState.java +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationState.java @@ -74,6 +74,7 @@ public class C2OperationState implements Serializable { FULLY_APPLIED, PARTIALLY_APPLIED, OPERATION_NOT_UNDERSTOOD, + NO_OPERATION, NOT_APPLIED; /** @@ -93,6 +94,8 @@ public class C2OperationState implements Serializable { case 2: return OPERATION_NOT_UNDERSTOOD; case 3: + return NO_OPERATION; + case 4: default: return NOT_APPLIED; } @@ -114,9 +117,11 @@ public class C2OperationState implements Serializable { return 1; case OPERATION_NOT_UNDERSTOOD: return 2; + case NO_OPERATION: + return 3; case NOT_APPLIED: default: - return 3; + return 4; } } } diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java index 4638f33b44..1f1cb19035 100644 --- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java @@ -26,7 +26,8 @@ public enum OperandType { CONNECTION, DEBUG, MANIFEST, - REPOSITORY; + REPOSITORY, + ASSET; public static Optional fromString(String value) { return Arrays.stream(values()) diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java index 6fe242fe82..9ee3630aef 100644 --- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java @@ -17,6 +17,7 @@ package org.apache.nifi.c2.protocol.api; +import static org.apache.nifi.c2.protocol.api.OperandType.ASSET; import static org.apache.nifi.c2.protocol.api.OperandType.CONFIGURATION; import static org.apache.nifi.c2.protocol.api.OperandType.CONNECTION; import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG; @@ -35,7 +36,7 @@ public enum OperationType { // C2 Server -> C2 Client Commands CLEAR(CONNECTION), DESCRIBE(MANIFEST), - UPDATE(CONFIGURATION), + UPDATE(CONFIGURATION, ASSET), RESTART, START, STOP, diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf index 3606b67002..b1925b6bad 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf @@ -151,6 +151,8 @@ java.arg.14=-Djava.awt.headless=true #c2.agent.identifier= # If set to false heartbeat won't contain the manifest. Defaults to true. #c2.full.heartbeat=false +# Directory for storing assets downloaded via C2 update/asset command +#c2.asset.directory=./asset ## Define TLS security properties for C2 communications #c2.security.truststore.location= #c2.security.truststore.password= @@ -162,4 +164,4 @@ java.arg.14=-Djava.awt.headless=true # The following ingestor configuration needs to be enabled in order to apply configuration updates coming from C2 server #nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.FileChangeIngestor #nifi.minifi.notifier.ingestors.file.config.path=./conf/config-new.yml -#nifi.minifi.notifier.ingestors.file.polling.period.seconds=5 \ No newline at end of file +#nifi.minifi.notifier.ingestors.file.polling.period.seconds=5 diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java index be24847438..840da2219d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java @@ -40,6 +40,7 @@ public class C2NiFiProperties { public static final String C2_AGENT_IDENTIFIER_KEY = C2_PREFIX + "agent.identifier"; public static final String C2_FULL_HEARTBEAT_KEY = C2_PREFIX + "full.heartbeat"; public static final String C2_REQUEST_COMPRESSION_KEY = C2_PREFIX + "request.compression"; + public static final String C2_ASSET_DIRECTORY_KEY = C2_PREFIX + "asset.directory"; public static final String C2_ROOT_CLASS_DEFINITIONS_KEY = C2_PREFIX + "root.class.definitions"; public static final String C2_METRICS_NAME_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.name"; @@ -73,4 +74,6 @@ public class C2NiFiProperties { // C2 request compression is turned off by default public static final String C2_REQUEST_COMPRESSION= "none"; + + public static final String C2_ASSET_DIRECTORY = "./asset"; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java index 0602a13f3e..795b3b4323 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java @@ -18,28 +18,19 @@ package org.apache.nifi.c2; import static java.util.Optional.ofNullable; -import static java.util.stream.Collectors.collectingAndThen; -import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toSet; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; -import java.util.stream.Stream; import org.apache.nifi.c2.client.C2ClientConfig; import org.apache.nifi.c2.client.http.C2HttpClient; import org.apache.nifi.c2.client.service.C2ClientService; @@ -48,12 +39,15 @@ import org.apache.nifi.c2.client.service.FlowIdHolder; import org.apache.nifi.c2.client.service.ManifestHashProvider; import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper; import org.apache.nifi.c2.client.service.operation.C2OperationService; -import org.apache.nifi.c2.client.service.operation.DebugOperationHandler; import org.apache.nifi.c2.client.service.operation.DescribeManifestOperationHandler; import org.apache.nifi.c2.client.service.operation.EmptyOperandPropertiesProvider; import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider; import org.apache.nifi.c2.client.service.operation.SupportedOperationsProvider; +import org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler; +import org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler; import org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler; +import org.apache.nifi.c2.command.TransferDebugCommandHelper; +import org.apache.nifi.c2.command.UpdateAssetCommandHelper; import org.apache.nifi.c2.protocol.api.AgentManifest; import org.apache.nifi.c2.protocol.api.AgentRepositories; import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus; @@ -76,22 +70,6 @@ import org.slf4j.LoggerFactory; public class C2NifiClientService { - public static final Set SENSITIVE_PROPERTY_KEYWORDS = - Stream.of("key:", "algorithm:", "secret.key", "sensitive.props.key", "sensitive.props.algorithm", "secret", "password", "passwd") - .map(String::toLowerCase) - .collect(collectingAndThen(toSet(), Collections::unmodifiableSet)); - public static final Predicate EXCLUDE_SENSITIVE_TEXT = text -> - ofNullable(text) - .map(String::toLowerCase) - .map(t -> SENSITIVE_PROPERTY_KEYWORDS.stream().noneMatch(keyword -> t.contains(keyword))) - .orElse(true); - - private static final String MINIFI_CONFIG_FILE_PATH = "nifi.minifi.config.file"; - private static final String MINIFI_BOOTSTRAP_FILE_PATH = "nifi.minifi.bootstrap.file"; - private static final String MINIFI_LOG_DIRECTORY = "nifi.minifi.log.directory"; - private static final String MINIFI_APP_LOG_FILE = "nifi.minifi.app.log.file"; - private static final String MINIFI_BOOTSTRAP_LOG_FILE = "nifi.minifi.bootstrap.log.file"; - private static final Logger logger = LoggerFactory.getLogger(C2NifiClientService.class); private static final String DEFAULT_CONF_DIR = "./conf"; @@ -124,19 +102,22 @@ public class C2NifiClientService { ); this.heartbeatPeriod = clientConfig.getHeartbeatPeriod(); this.flowController = flowController; + C2HttpClient client = new C2HttpClient(clientConfig, new C2JacksonSerializer()); C2HeartbeatFactory heartbeatFactory = new C2HeartbeatFactory(clientConfig, flowIdHolder, new ManifestHashProvider()); OperandPropertiesProvider emptyOperandPropertiesProvider = new EmptyOperandPropertiesProvider(); + TransferDebugCommandHelper transferDebugCommandHelper = new TransferDebugCommandHelper(niFiProperties); + UpdateAssetCommandHelper updateAssetCommandHelper = new UpdateAssetCommandHelper(clientConfig.getC2AssetDirectory()); + updateAssetCommandHelper.createAssetDirectory(); C2OperationService c2OperationService = new C2OperationService(Arrays.asList( new UpdateConfigurationOperationHandler(client, flowIdHolder, this::updateFlowContent, emptyOperandPropertiesProvider), new DescribeManifestOperationHandler(heartbeatFactory, this::generateRuntimeInfo, emptyOperandPropertiesProvider), - DebugOperationHandler.create(client, debugBundleFiles(niFiProperties), EXCLUDE_SENSITIVE_TEXT, emptyOperandPropertiesProvider) + TransferDebugOperationHandler.create(client, emptyOperandPropertiesProvider, + transferDebugCommandHelper.debugBundleFiles(), transferDebugCommandHelper::excludeSensitiveText), + UpdateAssetOperationHandler.create(client, emptyOperandPropertiesProvider, + updateAssetCommandHelper::assetUpdatePrecondition, updateAssetCommandHelper::assetPersistFunction) )); - this.c2ClientService = new C2ClientService( - client, - heartbeatFactory, - c2OperationService - ); + this.c2ClientService = new C2ClientService(client, heartbeatFactory, c2OperationService); this.supportedOperationsProvider = new SupportedOperationsProvider(c2OperationService.getHandlers()); } @@ -155,7 +136,8 @@ public class C2NifiClientService { C2NiFiProperties.C2_DEFAULT_CALL_TIMEOUT), TimeUnit.MILLISECONDS)) .c2Url(properties.getProperty(C2NiFiProperties.C2_REST_URL_KEY, "")) .c2RequestCompression(properties.getProperty(C2NiFiProperties.C2_REQUEST_COMPRESSION_KEY, C2NiFiProperties.C2_REQUEST_COMPRESSION)) - .confDirectory(properties.getProperty(C2NiFiProperties.C2_CONFIG_DIRECTORY_KEY, DEFAULT_CONF_DIR)) + .c2AssetDirectory(properties.getProperty(C2NiFiProperties.C2_ASSET_DIRECTORY_KEY, C2NiFiProperties.C2_ASSET_DIRECTORY)) + .confDirectory(properties.getProperty(C2NiFiProperties.C2_CONFIG_DIRECTORY_KEY, DEFAULT_CONF_DIR)) .runtimeManifestIdentifier(properties.getProperty(C2NiFiProperties.C2_RUNTIME_MANIFEST_IDENTIFIER_KEY, "")) .runtimeType(properties.getProperty(C2NiFiProperties.C2_RUNTIME_TYPE_KEY, "")) .c2AckUrl(properties.getProperty(C2NiFiProperties.C2_REST_URL_ACK_KEY, "")) @@ -251,15 +233,4 @@ public class C2NifiClientService { .map(parentDir -> new File(parentDir + TARGET_CONFIG_FILE)) .orElse(new File(DEFAULT_CONF_DIR + TARGET_CONFIG_FILE)); } - - private List debugBundleFiles(NiFiProperties properties) { - return Stream.of( - Paths.get(properties.getProperty(MINIFI_CONFIG_FILE_PATH)), - Paths.get(properties.getProperty(MINIFI_BOOTSTRAP_FILE_PATH)), - Paths.get(properties.getProperty(MINIFI_LOG_DIRECTORY), properties.getProperty(MINIFI_APP_LOG_FILE)), - Paths.get(properties.getProperty(MINIFI_LOG_DIRECTORY), properties.getProperty(MINIFI_BOOTSTRAP_LOG_FILE))) - .filter(Files::exists) - .filter(Files::isRegularFile) - .collect(toList()); - } -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/command/TransferDebugCommandHelper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/command/TransferDebugCommandHelper.java new file mode 100644 index 0000000000..a92517e6f9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/command/TransferDebugCommandHelper.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.command; + +import static java.util.Optional.ofNullable; +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Stream; +import org.apache.nifi.util.NiFiProperties; + +public class TransferDebugCommandHelper { + + private static final String MINIFI_CONFIG_FILE_PATH = "nifi.minifi.config.file"; + private static final String MINIFI_BOOTSTRAP_FILE_PATH = "nifi.minifi.bootstrap.file"; + private static final String MINIFI_LOG_DIRECTORY = "nifi.minifi.log.directory"; + private static final String MINIFI_APP_LOG_FILE = "nifi.minifi.app.log.file"; + private static final String MINIFI_BOOTSTRAP_LOG_FILE = "nifi.minifi.bootstrap.log.file"; + + private static final Set SENSITIVE_PROPERTY_KEYWORDS = + Stream.of("key:", "algorithm:", "secret.key", "sensitive.props.key", "sensitive.props.algorithm", "secret", "password", "passwd") + .map(String::toLowerCase) + .collect(collectingAndThen(toSet(), Collections::unmodifiableSet)); + + private final NiFiProperties niFiProperties; + + public TransferDebugCommandHelper(NiFiProperties niFiProperties) { + this.niFiProperties = niFiProperties; + } + + public List debugBundleFiles() { + return Stream.of( + Paths.get(niFiProperties.getProperty(MINIFI_CONFIG_FILE_PATH)), + Paths.get(niFiProperties.getProperty(MINIFI_BOOTSTRAP_FILE_PATH)), + Paths.get(niFiProperties.getProperty(MINIFI_LOG_DIRECTORY), niFiProperties.getProperty(MINIFI_APP_LOG_FILE)), + Paths.get(niFiProperties.getProperty(MINIFI_LOG_DIRECTORY), niFiProperties.getProperty(MINIFI_BOOTSTRAP_LOG_FILE))) + .filter(Files::exists) + .filter(Files::isRegularFile) + .collect(toList()); + } + + public boolean excludeSensitiveText(String text) { + return ofNullable(text) + .map(String::toLowerCase) + .map(t -> SENSITIVE_PROPERTY_KEYWORDS.stream().noneMatch(keyword -> t.contains(keyword))) + .orElse(true); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/command/UpdateAssetCommandHelper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/command/UpdateAssetCommandHelper.java new file mode 100644 index 0000000000..2231dd481b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/command/UpdateAssetCommandHelper.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.command; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UpdateAssetCommandHelper { + + private static final Logger LOG = LoggerFactory.getLogger(UpdateAssetCommandHelper.class); + + private final String assetDirectory; + + public UpdateAssetCommandHelper(String assetDirectory) { + this.assetDirectory = assetDirectory; + } + + public void createAssetDirectory() { + try { + Files.createDirectories(Paths.get(assetDirectory)); + } catch (IOException ioe) { + LOG.error("Unable to create asset directory {}", assetDirectory); + throw new UncheckedIOException("Unable to create directory", ioe); + } + } + + public boolean assetUpdatePrecondition(String assetFileName, Boolean forceDownload) { + Path assetPath = Paths.get(assetDirectory, assetFileName); + if (Files.exists(assetPath) && !forceDownload) { + LOG.info("Asset file already exists on path {}. Asset won't be downloaded", assetPath); + return false; + } + LOG.info("Asset file does not exist or force download is on. Asset will be downloaded to {}", assetPath); + return true; + } + + public boolean assetPersistFunction(String assetFileName, byte[] assetBinary) { + Path assetPath = Paths.get(assetDirectory, assetFileName); + try { + Files.deleteIfExists(assetPath); + Files.write(assetPath, assetBinary); + LOG.info("Asset was persisted to {}, {} bytes were written", assetPath, assetBinary.length); + return true; + } catch (IOException e) { + LOG.error("Persisting asset failed. File creation was not successful targeting {}", assetPath, e); + return false; + } + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/C2NifiClientServiceTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/command/TransferDebugCommandHelperTest.java similarity index 81% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/C2NifiClientServiceTest.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/command/TransferDebugCommandHelperTest.java index 8d58ea5b22..2dfbb68002 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/C2NifiClientServiceTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/command/TransferDebugCommandHelperTest.java @@ -14,17 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.c2; -import static org.apache.nifi.c2.C2NifiClientService.EXCLUDE_SENSITIVE_TEXT; +package org.apache.nifi.c2.command; + import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.stream.Stream; +import org.apache.nifi.util.NiFiProperties; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -public class C2NifiClientServiceTest { +public class TransferDebugCommandHelperTest { + + private TransferDebugCommandHelper transferDebugCommandHelper; + + @BeforeEach + public void setUp() { + NiFiProperties niFiProperties = new NiFiProperties(); + transferDebugCommandHelper = new TransferDebugCommandHelper(niFiProperties); + } private static Stream textAndExpectedResult() { return Stream.of( @@ -51,6 +61,6 @@ public class C2NifiClientServiceTest { @ParameterizedTest @MethodSource("textAndExpectedResult") public void testSensitiveTextIsExcluded(String propertyName, boolean expectedResult) { - assertEquals(expectedResult, EXCLUDE_SENSITIVE_TEXT.test(propertyName)); + assertEquals(expectedResult, transferDebugCommandHelper.excludeSensitiveText(propertyName)); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/command/UpdateAssetCommandHelperTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/command/UpdateAssetCommandHelperTest.java new file mode 100644 index 0000000000..ffa78c505f --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/command/UpdateAssetCommandHelperTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.command; + +import static java.lang.Boolean.FALSE; +import static java.lang.Boolean.TRUE; +import static java.nio.charset.Charset.defaultCharset; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.nio.file.Files.exists; +import static java.nio.file.Files.isDirectory; +import static java.nio.file.Files.readAllLines; +import static java.nio.file.Files.write; +import static java.util.Collections.singletonList; +import static org.apache.commons.lang3.StringUtils.EMPTY; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertIterableEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class UpdateAssetCommandHelperTest { + + private static final String ASSET_DIRECTORY = "asset_directory"; + private static final String ASSET_FILE = "asset.file"; + + @TempDir + private File tempDir; + + private Path assetDirectory; + private UpdateAssetCommandHelper updateAssetCommandHelper; + + @BeforeEach + public void setUp() { + assetDirectory = Paths.get(tempDir.getAbsolutePath(), ASSET_DIRECTORY); + updateAssetCommandHelper = new UpdateAssetCommandHelper(assetDirectory.toString()); + } + + @Test + public void testAssetDirectoryShouldBeCreated() { + // given + when + updateAssetCommandHelper.createAssetDirectory(); + + // then + assertTrue(exists(assetDirectory)); + assertTrue(isDirectory(assetDirectory)); + } + + @Test + public void testAssetFileDoesNotExist() { + // given + updateAssetCommandHelper.createAssetDirectory(); + + // when + boolean result = updateAssetCommandHelper.assetUpdatePrecondition(ASSET_FILE, FALSE); + + // then + assertTrue(result); + } + + @Test + public void testAssetFileExistsAndForceDownloadOff() { + // given + updateAssetCommandHelper.createAssetDirectory(); + touchAssetFile(); + + // when + boolean result = updateAssetCommandHelper.assetUpdatePrecondition(ASSET_FILE, FALSE); + + // then + assertFalse(result); + } + + @Test + public void testAssetFileExistsAndForceDownloadOn() { + // given + updateAssetCommandHelper.createAssetDirectory(); + touchAssetFile(); + + // when + boolean result = updateAssetCommandHelper.assetUpdatePrecondition(ASSET_FILE, TRUE); + + // then + assertTrue(result); + } + + @Test + public void testAssetPersistedCorrectly() throws IOException { + // given + updateAssetCommandHelper.createAssetDirectory(); + String testAssetContent = "test file content"; + + // when + boolean result = updateAssetCommandHelper.assetPersistFunction(ASSET_FILE, testAssetContent.getBytes(defaultCharset())); + + // then + assertTrue(result); + assertIterableEquals(singletonList(testAssetContent), readAllLines(assetDirectory.resolve(ASSET_FILE), defaultCharset())); + } + + @Test + public void testAssetDirectoryDoesNotExistWhenPersistingAsset() { + // given + String testAssetContent = "test file content"; + + // when + boolean result = updateAssetCommandHelper.assetPersistFunction(ASSET_FILE, testAssetContent.getBytes(defaultCharset())); + + // then + assertFalse(result); + assertFalse(exists(assetDirectory.resolve(ASSET_FILE))); + } + + private void touchAssetFile() { + try { + write(Paths.get(assetDirectory.toString(), ASSET_FILE), EMPTY.getBytes(UTF_8)); + } catch (IOException e) { + throw new UncheckedIOException("Failed to touch file", e); + } + } +}