mirror of https://github.com/apache/nifi.git
NIFI-10679 MiNiFi C2 Update Asset Command
Signed-off-by: Csaba Bejan <bejan.csaba@gmail.com> This closes #6583.
This commit is contained in:
parent
438d8ec1eb
commit
65558accda
|
@ -41,6 +41,7 @@ public class C2ClientConfig {
|
|||
private final long readTimeout;
|
||||
private final long connectTimeout;
|
||||
private final String c2RequestCompression;
|
||||
private final String c2AssetDirectory;
|
||||
|
||||
private C2ClientConfig(final Builder builder) {
|
||||
this.c2Url = builder.c2Url;
|
||||
|
@ -63,6 +64,7 @@ public class C2ClientConfig {
|
|||
this.readTimeout = builder.readTimeout;
|
||||
this.connectTimeout = builder.connectTimeout;
|
||||
this.c2RequestCompression = builder.c2RequestCompression;
|
||||
this.c2AssetDirectory = builder.c2AssetDirectory;
|
||||
}
|
||||
|
||||
public String getC2Url() {
|
||||
|
@ -145,6 +147,10 @@ public class C2ClientConfig {
|
|||
return c2RequestCompression;
|
||||
}
|
||||
|
||||
public String getC2AssetDirectory() {
|
||||
return c2AssetDirectory;
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder for client configuration.
|
||||
*/
|
||||
|
@ -170,6 +176,7 @@ public class C2ClientConfig {
|
|||
private long readTimeout;
|
||||
private long connectTimeout;
|
||||
private String c2RequestCompression;
|
||||
private String c2AssetDirectory;
|
||||
|
||||
public Builder c2Url(final String c2Url) {
|
||||
this.c2Url = c2Url;
|
||||
|
@ -271,6 +278,11 @@ public class C2ClientConfig {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder c2AssetDirectory(final String c2AssetDirectory) {
|
||||
this.c2AssetDirectory = c2AssetDirectory;
|
||||
return this;
|
||||
}
|
||||
|
||||
public C2ClientConfig build() {
|
||||
return new C2ClientConfig(this);
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import static java.nio.file.Files.copy;
|
|||
import static java.nio.file.Files.createTempDirectory;
|
||||
import static java.nio.file.Files.lines;
|
||||
import static java.nio.file.Files.walk;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Optional.empty;
|
||||
import static java.util.Optional.ofNullable;
|
||||
import static org.apache.commons.compress.utils.IOUtils.closeQuietly;
|
||||
|
@ -57,9 +58,9 @@ import org.apache.nifi.c2.protocol.api.OperationType;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class DebugOperationHandler implements C2OperationHandler {
|
||||
public class TransferDebugOperationHandler implements C2OperationHandler {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DebugOperationHandler.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TransferDebugOperationHandler.class);
|
||||
|
||||
private static final String C2_CALLBACK_URL_NOT_FOUND = "C2 Server callback URL was not found in request";
|
||||
private static final String SUCCESSFUL_UPLOAD = "Debug bundle was uploaded successfully";
|
||||
|
@ -69,31 +70,33 @@ public class DebugOperationHandler implements C2OperationHandler {
|
|||
static final String NEW_LINE = "\n";
|
||||
|
||||
private final C2Client c2Client;
|
||||
private final OperandPropertiesProvider operandPropertiesProvider;
|
||||
private final List<Path> bundleFilePaths;
|
||||
private final Predicate<String> contentFilter;
|
||||
private final OperandPropertiesProvider operandPropertiesProvider;
|
||||
|
||||
private DebugOperationHandler(C2Client c2Client, List<Path> bundleFilePaths, Predicate<String> contentFilter,
|
||||
OperandPropertiesProvider operandPropertiesProvider) {
|
||||
private TransferDebugOperationHandler(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider,
|
||||
List<Path> bundleFilePaths, Predicate<String> contentFilter) {
|
||||
this.c2Client = c2Client;
|
||||
this.operandPropertiesProvider = operandPropertiesProvider;
|
||||
this.bundleFilePaths = bundleFilePaths;
|
||||
this.contentFilter = contentFilter;
|
||||
this.operandPropertiesProvider = operandPropertiesProvider;
|
||||
}
|
||||
|
||||
public static DebugOperationHandler create(C2Client c2Client, List<Path> bundleFilePaths, Predicate<String> contentFilter,
|
||||
OperandPropertiesProvider operandPropertiesProvider) {
|
||||
public static TransferDebugOperationHandler create(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider,
|
||||
List<Path> bundleFilePaths, Predicate<String> contentFilter) {
|
||||
if (c2Client == null) {
|
||||
throw new IllegalArgumentException("C2Client should not be null");
|
||||
}
|
||||
if (operandPropertiesProvider == null) {
|
||||
throw new IllegalArgumentException("OperandPropertiesProvider should not be not null");
|
||||
}
|
||||
if (bundleFilePaths == null || bundleFilePaths.isEmpty()) {
|
||||
throw new IllegalArgumentException("bundleFilePaths should not be not null or empty");
|
||||
}
|
||||
if (contentFilter == null) {
|
||||
throw new IllegalArgumentException("Content filter should not be null");
|
||||
}
|
||||
|
||||
return new DebugOperationHandler(c2Client, bundleFilePaths, contentFilter, operandPropertiesProvider);
|
||||
return new TransferDebugOperationHandler(c2Client, operandPropertiesProvider, bundleFilePaths, contentFilter);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -113,7 +116,7 @@ public class DebugOperationHandler implements C2OperationHandler {
|
|||
|
||||
@Override
|
||||
public C2OperationAck handle(C2Operation operation) {
|
||||
String debugCallbackUrl = operation.getArgs().get(TARGET_ARG);
|
||||
String debugCallbackUrl = ofNullable(operation.getArgs()).orElse(emptyMap()).get(TARGET_ARG);
|
||||
if (debugCallbackUrl == null) {
|
||||
LOG.error("Callback URL was not found in C2 request.");
|
||||
return operationAck(operation, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND));
|
|
@ -0,0 +1,150 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.c2.client.service.operation;
|
||||
|
||||
import static java.lang.Boolean.parseBoolean;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Optional.ofNullable;
|
||||
import static org.apache.commons.lang3.StringUtils.EMPTY;
|
||||
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
|
||||
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
|
||||
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
|
||||
import static org.apache.nifi.c2.protocol.api.OperandType.ASSET;
|
||||
import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.BiPredicate;
|
||||
import org.apache.nifi.c2.client.api.C2Client;
|
||||
import org.apache.nifi.c2.protocol.api.C2Operation;
|
||||
import org.apache.nifi.c2.protocol.api.C2OperationAck;
|
||||
import org.apache.nifi.c2.protocol.api.C2OperationState;
|
||||
import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
|
||||
import org.apache.nifi.c2.protocol.api.OperandType;
|
||||
import org.apache.nifi.c2.protocol.api.OperationType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class UpdateAssetOperationHandler implements C2OperationHandler {
|
||||
|
||||
static final String ASSET_URL_KEY = "url";
|
||||
static final String ASSET_FILE_KEY = "file";
|
||||
static final String ASSET_FORCE_DOWNLOAD_KEY = "forceDownload";
|
||||
|
||||
static final String C2_CALLBACK_URL_NOT_FOUND = "C2 Server callback URL was not found in request";
|
||||
static final String ASSET_FILE_NAME_NOT_FOUND = "Asset file name was not found in request";
|
||||
static final String SUCCESSFULLY_UPDATE_ASSET = "Successfully update asset";
|
||||
static final String FAILED_TO_PERSIST_ASSET_TO_DISK = "Failed to persist asset to disk";
|
||||
static final String UPDATE_ASSET_RETRIEVAL_RESULTED_IN_EMPTY_CONTENT = "Update asset retrieval resulted in empty content";
|
||||
static final String UPDATE_ASSET_PRECONDITIONS_WERE_NOT_MET = "Update asset preconditions were not met. No update will be performed";
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(UpdateAssetOperationHandler.class);
|
||||
|
||||
private final C2Client client;
|
||||
private final OperandPropertiesProvider operandPropertiesProvider;
|
||||
private final BiPredicate<String, Boolean> assetUpdatePrecondition;
|
||||
private final BiFunction<String, byte[], Boolean> assetPersistFunction;
|
||||
|
||||
public UpdateAssetOperationHandler(C2Client client, OperandPropertiesProvider operandPropertiesProvider,
|
||||
BiPredicate<String, Boolean> assetUpdatePrecondition, BiFunction<String, byte[], Boolean> assetPersistFunction) {
|
||||
this.client = client;
|
||||
this.operandPropertiesProvider = operandPropertiesProvider;
|
||||
this.assetUpdatePrecondition = assetUpdatePrecondition;
|
||||
this.assetPersistFunction = assetPersistFunction;
|
||||
}
|
||||
|
||||
public static UpdateAssetOperationHandler create(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider,
|
||||
BiPredicate<String, Boolean> assetUpdatePrecondition, BiFunction<String, byte[], Boolean> assetPersistFunction) {
|
||||
if (c2Client == null) {
|
||||
throw new IllegalArgumentException("C2Client should not be null");
|
||||
}
|
||||
if (operandPropertiesProvider == null) {
|
||||
throw new IllegalArgumentException("OperandPropertiesProvider should not be not null");
|
||||
}
|
||||
if (assetUpdatePrecondition == null) {
|
||||
throw new IllegalArgumentException("Asset update precondition should not be null");
|
||||
}
|
||||
if (assetPersistFunction == null) {
|
||||
throw new IllegalArgumentException("Asset persist function should not be null");
|
||||
}
|
||||
|
||||
return new UpdateAssetOperationHandler(c2Client, operandPropertiesProvider, assetUpdatePrecondition, assetPersistFunction);
|
||||
}
|
||||
|
||||
@Override
|
||||
public OperationType getOperationType() {
|
||||
return UPDATE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OperandType getOperandType() {
|
||||
return ASSET;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getProperties() {
|
||||
return operandPropertiesProvider.getProperties();
|
||||
}
|
||||
|
||||
@Override
|
||||
public C2OperationAck handle(C2Operation operation) {
|
||||
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
|
||||
|
||||
String assetUrl = getOperationArg(operation, ASSET_URL_KEY);
|
||||
if (assetUrl == null) {
|
||||
LOG.error("Callback URL with key={} was not found in C2 request. C2 request arguments={}", ASSET_URL_KEY, operation.getArgs());
|
||||
return operationAck(operationId, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND));
|
||||
}
|
||||
String assetFileName = getOperationArg(operation, ASSET_FILE_KEY);
|
||||
if (assetFileName == null) {
|
||||
LOG.error("Asset file name with key={} was not found in C2 request. C2 request arguments={}", ASSET_FILE_KEY, operation.getArgs());
|
||||
return operationAck(operationId, operationState(NOT_APPLIED, ASSET_FILE_NAME_NOT_FOUND));
|
||||
}
|
||||
boolean forceDownload = parseBoolean(getOperationArg(operation, ASSET_FORCE_DOWNLOAD_KEY));
|
||||
|
||||
LOG.info("Initiating asset update from url {} with name {}, force update is {}", assetUrl, assetFileName, forceDownload);
|
||||
|
||||
C2OperationState operationState = assetUpdatePrecondition.test(assetFileName, forceDownload)
|
||||
? client.retrieveUpdateContent(assetUrl)
|
||||
.map(content -> assetPersistFunction.apply(assetFileName, content)
|
||||
? operationState(FULLY_APPLIED, SUCCESSFULLY_UPDATE_ASSET)
|
||||
: operationState(NOT_APPLIED, FAILED_TO_PERSIST_ASSET_TO_DISK))
|
||||
.orElseGet(() -> operationState(NOT_APPLIED, UPDATE_ASSET_RETRIEVAL_RESULTED_IN_EMPTY_CONTENT))
|
||||
: operationState(NO_OPERATION, UPDATE_ASSET_PRECONDITIONS_WERE_NOT_MET);
|
||||
|
||||
return operationAck(operationId, operationState);
|
||||
}
|
||||
|
||||
private String getOperationArg(C2Operation operation, String argument) {
|
||||
return ofNullable(operation.getArgs()).orElse(emptyMap()).get(argument);
|
||||
}
|
||||
|
||||
private C2OperationState operationState(OperationState operationState, String details) {
|
||||
C2OperationState state = new C2OperationState();
|
||||
state.setState(operationState);
|
||||
state.setDetails(details);
|
||||
return state;
|
||||
}
|
||||
|
||||
private C2OperationAck operationAck(String operationId, C2OperationState operationState) {
|
||||
C2OperationAck operationAck = new C2OperationAck();
|
||||
operationAck.setOperationState(operationState);
|
||||
operationAck.setOperationId(operationId);
|
||||
return operationAck;
|
||||
}
|
||||
}
|
|
@ -26,8 +26,8 @@ import static java.util.function.Function.identity;
|
|||
import static java.util.stream.Collectors.joining;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static java.util.stream.Collectors.toMap;
|
||||
import static org.apache.nifi.c2.client.service.operation.DebugOperationHandler.NEW_LINE;
|
||||
import static org.apache.nifi.c2.client.service.operation.DebugOperationHandler.TARGET_ARG;
|
||||
import static org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler.NEW_LINE;
|
||||
import static org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler.TARGET_ARG;
|
||||
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
|
||||
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
|
||||
import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
|
||||
|
@ -68,7 +68,7 @@ import org.mockito.Mock;
|
|||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class DebugOperationHandlerTest {
|
||||
public class TransferDebugOperationHandlerTest {
|
||||
|
||||
private static final String OPERATION_ID = "operationId";
|
||||
private static final String C2_DEBUG_UPLOAD_ENDPOINT = "https://host/c2/api/upload";
|
||||
|
@ -99,13 +99,13 @@ public class DebugOperationHandlerTest {
|
|||
@ParameterizedTest(name = "c2Client={0} bundleFileList={1} contentFilter={2}")
|
||||
@MethodSource("invalidConstructorArguments")
|
||||
public void testAttemptingCreateWithInvalidParametersWillThrowException(C2Client c2Client, List<Path> bundleFilePaths, Predicate<String> contentFilter) {
|
||||
assertThrows(IllegalArgumentException.class, () -> DebugOperationHandler.create(c2Client, bundleFilePaths, contentFilter, operandPropertiesProvider));
|
||||
assertThrows(IllegalArgumentException.class, () -> TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, bundleFilePaths, contentFilter));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOperationAndOperandTypesAreMatching() {
|
||||
// given
|
||||
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER, operandPropertiesProvider);
|
||||
TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER);
|
||||
|
||||
// when + then
|
||||
assertEquals(TRANSFER, testHandler.getOperationType());
|
||||
|
@ -115,7 +115,7 @@ public class DebugOperationHandlerTest {
|
|||
@Test
|
||||
public void testC2CallbackUrlIsNullInArgs() {
|
||||
// given
|
||||
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER, operandPropertiesProvider);
|
||||
TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER);
|
||||
C2Operation c2Operation = operation(null);
|
||||
|
||||
// when
|
||||
|
@ -134,7 +134,7 @@ public class DebugOperationHandlerTest {
|
|||
List<Path> createBundleFiles = bundleFileNamesWithContents.entrySet().stream()
|
||||
.map(entry -> placeFileWithContent(entry.getKey(), entry.getValue()))
|
||||
.collect(toList());
|
||||
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, createBundleFiles, DEFAULT_CONTENT_FILTER, operandPropertiesProvider);
|
||||
TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, createBundleFiles, DEFAULT_CONTENT_FILTER);
|
||||
C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT);
|
||||
|
||||
// when
|
||||
|
@ -154,8 +154,8 @@ public class DebugOperationHandlerTest {
|
|||
@Test
|
||||
public void testFileToCollectDoesNotExist() {
|
||||
// given
|
||||
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, singletonList(Paths.get(tempDir.getAbsolutePath(), "missing_file")),
|
||||
DEFAULT_CONTENT_FILTER, operandPropertiesProvider);
|
||||
TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider,
|
||||
singletonList(Paths.get(tempDir.getAbsolutePath(), "missing_file")), DEFAULT_CONTENT_FILTER);
|
||||
C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT);
|
||||
|
||||
// when
|
||||
|
@ -194,7 +194,7 @@ public class DebugOperationHandlerTest {
|
|||
// given
|
||||
Path bundleFile = placeFileWithContent(fileName, inputContent);
|
||||
Predicate<String> testContentFilter = content -> !content.contains(filterKeyword);
|
||||
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, singletonList(bundleFile), testContentFilter, operandPropertiesProvider);
|
||||
TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, singletonList(bundleFile), testContentFilter);
|
||||
C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT);
|
||||
|
||||
// when
|
|
@ -0,0 +1,212 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.c2.client.service.operation;
|
||||
|
||||
import static java.lang.Boolean.FALSE;
|
||||
import static java.lang.Boolean.TRUE;
|
||||
import static java.lang.Boolean.parseBoolean;
|
||||
import static java.util.Optional.empty;
|
||||
import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.ASSET_FILE_KEY;
|
||||
import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.ASSET_FILE_NAME_NOT_FOUND;
|
||||
import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.ASSET_FORCE_DOWNLOAD_KEY;
|
||||
import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.ASSET_URL_KEY;
|
||||
import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.C2_CALLBACK_URL_NOT_FOUND;
|
||||
import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.FAILED_TO_PERSIST_ASSET_TO_DISK;
|
||||
import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.SUCCESSFULLY_UPDATE_ASSET;
|
||||
import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.UPDATE_ASSET_PRECONDITIONS_WERE_NOT_MET;
|
||||
import static org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler.UPDATE_ASSET_RETRIEVAL_RESULTED_IN_EMPTY_CONTENT;
|
||||
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
|
||||
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
|
||||
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
|
||||
import static org.apache.nifi.c2.protocol.api.OperandType.ASSET;
|
||||
import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.BiPredicate;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.nifi.c2.client.api.C2Client;
|
||||
import org.apache.nifi.c2.protocol.api.C2Operation;
|
||||
import org.apache.nifi.c2.protocol.api.C2OperationAck;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class UpdateAssetOperationHandlerTest {
|
||||
|
||||
private static final String OPERATION_ID = "operationId";
|
||||
private static final String ASSET_URL = "http://c2.server/asset/1";
|
||||
private static final String ASSET_FILE_NAME = "asset.file";
|
||||
private static final String FORCE_DOWNLOAD = "true";
|
||||
|
||||
@Mock
|
||||
private C2Client c2Client;
|
||||
|
||||
@Mock
|
||||
private OperandPropertiesProvider operandPropertiesProvider;
|
||||
|
||||
@Mock
|
||||
private BiPredicate<String, Boolean> assetUpdatePrecondition;
|
||||
|
||||
@Mock
|
||||
private BiFunction<String, byte[], Boolean> assetPersistFunction;
|
||||
|
||||
@InjectMocks
|
||||
private UpdateAssetOperationHandler testHandler;
|
||||
|
||||
private static Stream<Arguments> invalidConstructorArguments() {
|
||||
return Stream.of(
|
||||
Arguments.of(null, null, null, null),
|
||||
Arguments.of(mock(C2Client.class), null, null, null),
|
||||
Arguments.of(mock(C2Client.class), mock(OperandPropertiesProvider.class), null, null),
|
||||
Arguments.of(mock(C2Client.class), mock(OperandPropertiesProvider.class), mock(BiPredicate.class), null));
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = "c2Client={0} operandPropertiesProvider={1} bundleFileList={2} contentFilter={3}")
|
||||
@MethodSource("invalidConstructorArguments")
|
||||
public void testAttemptingCreateWithInvalidParametersWillThrowException(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider,
|
||||
BiPredicate<String, Boolean> assetUpdatePrecondition,
|
||||
BiFunction<String, byte[], Boolean> assetPersistFunction) {
|
||||
assertThrows(IllegalArgumentException.class, () -> UpdateAssetOperationHandler.create(c2Client, operandPropertiesProvider, assetUpdatePrecondition, assetPersistFunction));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOperationAndOperandTypesAreMatching() {
|
||||
assertEquals(UPDATE, testHandler.getOperationType());
|
||||
assertEquals(ASSET, testHandler.getOperandType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssetUrlCanNotBeNull() {
|
||||
// given
|
||||
C2Operation operation = operation(null, ASSET_FILE_NAME, FORCE_DOWNLOAD);
|
||||
|
||||
// when
|
||||
C2OperationAck result = testHandler.handle(operation);
|
||||
|
||||
// then
|
||||
assertEquals(OPERATION_ID, result.getOperationId());
|
||||
assertEquals(NOT_APPLIED, result.getOperationState().getState());
|
||||
assertEquals(C2_CALLBACK_URL_NOT_FOUND, result.getOperationState().getDetails());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssetFileNameCanNotBeNull() {
|
||||
// given
|
||||
C2Operation operation = operation(ASSET_URL, null, FORCE_DOWNLOAD);
|
||||
|
||||
// when
|
||||
C2OperationAck result = testHandler.handle(operation);
|
||||
|
||||
// then
|
||||
assertEquals(OPERATION_ID, result.getOperationId());
|
||||
assertEquals(NOT_APPLIED, result.getOperationState().getState());
|
||||
assertEquals(ASSET_FILE_NAME_NOT_FOUND, result.getOperationState().getDetails());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssetPreconditionIsNotMet() {
|
||||
// given
|
||||
C2Operation operation = operation(ASSET_URL, ASSET_FILE_NAME, FORCE_DOWNLOAD);
|
||||
when(assetUpdatePrecondition.test(ASSET_FILE_NAME, parseBoolean(FORCE_DOWNLOAD))).thenReturn(FALSE);
|
||||
|
||||
// when
|
||||
C2OperationAck result = testHandler.handle(operation);
|
||||
|
||||
// then
|
||||
assertEquals(OPERATION_ID, result.getOperationId());
|
||||
assertEquals(NO_OPERATION, result.getOperationState().getState());
|
||||
assertEquals(UPDATE_ASSET_PRECONDITIONS_WERE_NOT_MET, result.getOperationState().getDetails());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssetRetrieveReturnsEmptyResult() {
|
||||
// given
|
||||
C2Operation operation = operation(ASSET_URL, ASSET_FILE_NAME, FORCE_DOWNLOAD);
|
||||
when(assetUpdatePrecondition.test(ASSET_FILE_NAME, parseBoolean(FORCE_DOWNLOAD))).thenReturn(TRUE);
|
||||
when(c2Client.retrieveUpdateContent(ASSET_URL)).thenReturn(empty());
|
||||
|
||||
// when
|
||||
C2OperationAck result = testHandler.handle(operation);
|
||||
|
||||
// then
|
||||
assertEquals(OPERATION_ID, result.getOperationId());
|
||||
assertEquals(NOT_APPLIED, result.getOperationState().getState());
|
||||
assertEquals(UPDATE_ASSET_RETRIEVAL_RESULTED_IN_EMPTY_CONTENT, result.getOperationState().getDetails());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssetPersistFunctionFails() {
|
||||
// given
|
||||
C2Operation operation = operation(ASSET_URL, ASSET_FILE_NAME, FORCE_DOWNLOAD);
|
||||
when(assetUpdatePrecondition.test(ASSET_FILE_NAME, parseBoolean(FORCE_DOWNLOAD))).thenReturn(TRUE);
|
||||
byte[] mockUpdateContent = new byte[0];
|
||||
when(c2Client.retrieveUpdateContent(ASSET_URL)).thenReturn(Optional.of(mockUpdateContent));
|
||||
when(assetPersistFunction.apply(ASSET_FILE_NAME, mockUpdateContent)).thenReturn(FALSE);
|
||||
|
||||
// when
|
||||
C2OperationAck result = testHandler.handle(operation);
|
||||
|
||||
// then
|
||||
assertEquals(OPERATION_ID, result.getOperationId());
|
||||
assertEquals(NOT_APPLIED, result.getOperationState().getState());
|
||||
assertEquals(FAILED_TO_PERSIST_ASSET_TO_DISK, result.getOperationState().getDetails());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssetIsDownloadedAndPersistedSuccessfully() {
|
||||
// given
|
||||
C2Operation operation = operation(ASSET_URL, ASSET_FILE_NAME, FORCE_DOWNLOAD);
|
||||
when(assetUpdatePrecondition.test(ASSET_FILE_NAME, parseBoolean(FORCE_DOWNLOAD))).thenReturn(TRUE);
|
||||
byte[] mockUpdateContent = new byte[0];
|
||||
when(c2Client.retrieveUpdateContent(ASSET_URL)).thenReturn(Optional.of(mockUpdateContent));
|
||||
when(assetPersistFunction.apply(ASSET_FILE_NAME, mockUpdateContent)).thenReturn(TRUE);
|
||||
|
||||
// when
|
||||
C2OperationAck result = testHandler.handle(operation);
|
||||
|
||||
// then
|
||||
assertEquals(OPERATION_ID, result.getOperationId());
|
||||
assertEquals(FULLY_APPLIED, result.getOperationState().getState());
|
||||
assertEquals(SUCCESSFULLY_UPDATE_ASSET, result.getOperationState().getDetails());
|
||||
}
|
||||
|
||||
private C2Operation operation(String assetUrl, String assetFile, String forceDownload) {
|
||||
C2Operation c2Operation = new C2Operation();
|
||||
c2Operation.setIdentifier(OPERATION_ID);
|
||||
|
||||
Map<String, String> arguments = new HashMap<>();
|
||||
arguments.put(ASSET_URL_KEY, assetUrl);
|
||||
arguments.put(ASSET_FILE_KEY, assetFile);
|
||||
arguments.put(ASSET_FORCE_DOWNLOAD_KEY, forceDownload);
|
||||
c2Operation.setArgs(arguments);
|
||||
return c2Operation;
|
||||
}
|
||||
}
|
|
@ -74,6 +74,7 @@ public class C2OperationState implements Serializable {
|
|||
FULLY_APPLIED,
|
||||
PARTIALLY_APPLIED,
|
||||
OPERATION_NOT_UNDERSTOOD,
|
||||
NO_OPERATION,
|
||||
NOT_APPLIED;
|
||||
|
||||
/**
|
||||
|
@ -93,6 +94,8 @@ public class C2OperationState implements Serializable {
|
|||
case 2:
|
||||
return OPERATION_NOT_UNDERSTOOD;
|
||||
case 3:
|
||||
return NO_OPERATION;
|
||||
case 4:
|
||||
default:
|
||||
return NOT_APPLIED;
|
||||
}
|
||||
|
@ -114,9 +117,11 @@ public class C2OperationState implements Serializable {
|
|||
return 1;
|
||||
case OPERATION_NOT_UNDERSTOOD:
|
||||
return 2;
|
||||
case NO_OPERATION:
|
||||
return 3;
|
||||
case NOT_APPLIED:
|
||||
default:
|
||||
return 3;
|
||||
return 4;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,8 @@ public enum OperandType {
|
|||
CONNECTION,
|
||||
DEBUG,
|
||||
MANIFEST,
|
||||
REPOSITORY;
|
||||
REPOSITORY,
|
||||
ASSET;
|
||||
|
||||
public static Optional<OperandType> fromString(String value) {
|
||||
return Arrays.stream(values())
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.nifi.c2.protocol.api;
|
||||
|
||||
import static org.apache.nifi.c2.protocol.api.OperandType.ASSET;
|
||||
import static org.apache.nifi.c2.protocol.api.OperandType.CONFIGURATION;
|
||||
import static org.apache.nifi.c2.protocol.api.OperandType.CONNECTION;
|
||||
import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
|
||||
|
@ -35,7 +36,7 @@ public enum OperationType {
|
|||
// C2 Server -> C2 Client Commands
|
||||
CLEAR(CONNECTION),
|
||||
DESCRIBE(MANIFEST),
|
||||
UPDATE(CONFIGURATION),
|
||||
UPDATE(CONFIGURATION, ASSET),
|
||||
RESTART,
|
||||
START,
|
||||
STOP,
|
||||
|
|
|
@ -151,6 +151,8 @@ java.arg.14=-Djava.awt.headless=true
|
|||
#c2.agent.identifier=
|
||||
# If set to false heartbeat won't contain the manifest. Defaults to true.
|
||||
#c2.full.heartbeat=false
|
||||
# Directory for storing assets downloaded via C2 update/asset command
|
||||
#c2.asset.directory=./asset
|
||||
## Define TLS security properties for C2 communications
|
||||
#c2.security.truststore.location=
|
||||
#c2.security.truststore.password=
|
||||
|
@ -162,4 +164,4 @@ java.arg.14=-Djava.awt.headless=true
|
|||
# The following ingestor configuration needs to be enabled in order to apply configuration updates coming from C2 server
|
||||
#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.FileChangeIngestor
|
||||
#nifi.minifi.notifier.ingestors.file.config.path=./conf/config-new.yml
|
||||
#nifi.minifi.notifier.ingestors.file.polling.period.seconds=5
|
||||
#nifi.minifi.notifier.ingestors.file.polling.period.seconds=5
|
||||
|
|
|
@ -40,6 +40,7 @@ public class C2NiFiProperties {
|
|||
public static final String C2_AGENT_IDENTIFIER_KEY = C2_PREFIX + "agent.identifier";
|
||||
public static final String C2_FULL_HEARTBEAT_KEY = C2_PREFIX + "full.heartbeat";
|
||||
public static final String C2_REQUEST_COMPRESSION_KEY = C2_PREFIX + "request.compression";
|
||||
public static final String C2_ASSET_DIRECTORY_KEY = C2_PREFIX + "asset.directory";
|
||||
|
||||
public static final String C2_ROOT_CLASS_DEFINITIONS_KEY = C2_PREFIX + "root.class.definitions";
|
||||
public static final String C2_METRICS_NAME_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.name";
|
||||
|
@ -73,4 +74,6 @@ public class C2NiFiProperties {
|
|||
|
||||
// C2 request compression is turned off by default
|
||||
public static final String C2_REQUEST_COMPRESSION= "none";
|
||||
|
||||
public static final String C2_ASSET_DIRECTORY = "./asset";
|
||||
}
|
||||
|
|
|
@ -18,28 +18,19 @@
|
|||
package org.apache.nifi.c2;
|
||||
|
||||
import static java.util.Optional.ofNullable;
|
||||
import static java.util.stream.Collectors.collectingAndThen;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static java.util.stream.Collectors.toSet;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.nifi.c2.client.C2ClientConfig;
|
||||
import org.apache.nifi.c2.client.http.C2HttpClient;
|
||||
import org.apache.nifi.c2.client.service.C2ClientService;
|
||||
|
@ -48,12 +39,15 @@ import org.apache.nifi.c2.client.service.FlowIdHolder;
|
|||
import org.apache.nifi.c2.client.service.ManifestHashProvider;
|
||||
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
|
||||
import org.apache.nifi.c2.client.service.operation.C2OperationService;
|
||||
import org.apache.nifi.c2.client.service.operation.DebugOperationHandler;
|
||||
import org.apache.nifi.c2.client.service.operation.DescribeManifestOperationHandler;
|
||||
import org.apache.nifi.c2.client.service.operation.EmptyOperandPropertiesProvider;
|
||||
import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider;
|
||||
import org.apache.nifi.c2.client.service.operation.SupportedOperationsProvider;
|
||||
import org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler;
|
||||
import org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler;
|
||||
import org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler;
|
||||
import org.apache.nifi.c2.command.TransferDebugCommandHelper;
|
||||
import org.apache.nifi.c2.command.UpdateAssetCommandHelper;
|
||||
import org.apache.nifi.c2.protocol.api.AgentManifest;
|
||||
import org.apache.nifi.c2.protocol.api.AgentRepositories;
|
||||
import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
|
||||
|
@ -76,22 +70,6 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
public class C2NifiClientService {
|
||||
|
||||
public static final Set<String> SENSITIVE_PROPERTY_KEYWORDS =
|
||||
Stream.of("key:", "algorithm:", "secret.key", "sensitive.props.key", "sensitive.props.algorithm", "secret", "password", "passwd")
|
||||
.map(String::toLowerCase)
|
||||
.collect(collectingAndThen(toSet(), Collections::unmodifiableSet));
|
||||
public static final Predicate<String> EXCLUDE_SENSITIVE_TEXT = text ->
|
||||
ofNullable(text)
|
||||
.map(String::toLowerCase)
|
||||
.map(t -> SENSITIVE_PROPERTY_KEYWORDS.stream().noneMatch(keyword -> t.contains(keyword)))
|
||||
.orElse(true);
|
||||
|
||||
private static final String MINIFI_CONFIG_FILE_PATH = "nifi.minifi.config.file";
|
||||
private static final String MINIFI_BOOTSTRAP_FILE_PATH = "nifi.minifi.bootstrap.file";
|
||||
private static final String MINIFI_LOG_DIRECTORY = "nifi.minifi.log.directory";
|
||||
private static final String MINIFI_APP_LOG_FILE = "nifi.minifi.app.log.file";
|
||||
private static final String MINIFI_BOOTSTRAP_LOG_FILE = "nifi.minifi.bootstrap.log.file";
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(C2NifiClientService.class);
|
||||
|
||||
private static final String DEFAULT_CONF_DIR = "./conf";
|
||||
|
@ -124,19 +102,22 @@ public class C2NifiClientService {
|
|||
);
|
||||
this.heartbeatPeriod = clientConfig.getHeartbeatPeriod();
|
||||
this.flowController = flowController;
|
||||
|
||||
C2HttpClient client = new C2HttpClient(clientConfig, new C2JacksonSerializer());
|
||||
C2HeartbeatFactory heartbeatFactory = new C2HeartbeatFactory(clientConfig, flowIdHolder, new ManifestHashProvider());
|
||||
OperandPropertiesProvider emptyOperandPropertiesProvider = new EmptyOperandPropertiesProvider();
|
||||
TransferDebugCommandHelper transferDebugCommandHelper = new TransferDebugCommandHelper(niFiProperties);
|
||||
UpdateAssetCommandHelper updateAssetCommandHelper = new UpdateAssetCommandHelper(clientConfig.getC2AssetDirectory());
|
||||
updateAssetCommandHelper.createAssetDirectory();
|
||||
C2OperationService c2OperationService = new C2OperationService(Arrays.asList(
|
||||
new UpdateConfigurationOperationHandler(client, flowIdHolder, this::updateFlowContent, emptyOperandPropertiesProvider),
|
||||
new DescribeManifestOperationHandler(heartbeatFactory, this::generateRuntimeInfo, emptyOperandPropertiesProvider),
|
||||
DebugOperationHandler.create(client, debugBundleFiles(niFiProperties), EXCLUDE_SENSITIVE_TEXT, emptyOperandPropertiesProvider)
|
||||
TransferDebugOperationHandler.create(client, emptyOperandPropertiesProvider,
|
||||
transferDebugCommandHelper.debugBundleFiles(), transferDebugCommandHelper::excludeSensitiveText),
|
||||
UpdateAssetOperationHandler.create(client, emptyOperandPropertiesProvider,
|
||||
updateAssetCommandHelper::assetUpdatePrecondition, updateAssetCommandHelper::assetPersistFunction)
|
||||
));
|
||||
this.c2ClientService = new C2ClientService(
|
||||
client,
|
||||
heartbeatFactory,
|
||||
c2OperationService
|
||||
);
|
||||
this.c2ClientService = new C2ClientService(client, heartbeatFactory, c2OperationService);
|
||||
this.supportedOperationsProvider = new SupportedOperationsProvider(c2OperationService.getHandlers());
|
||||
}
|
||||
|
||||
|
@ -155,7 +136,8 @@ public class C2NifiClientService {
|
|||
C2NiFiProperties.C2_DEFAULT_CALL_TIMEOUT), TimeUnit.MILLISECONDS))
|
||||
.c2Url(properties.getProperty(C2NiFiProperties.C2_REST_URL_KEY, ""))
|
||||
.c2RequestCompression(properties.getProperty(C2NiFiProperties.C2_REQUEST_COMPRESSION_KEY, C2NiFiProperties.C2_REQUEST_COMPRESSION))
|
||||
.confDirectory(properties.getProperty(C2NiFiProperties.C2_CONFIG_DIRECTORY_KEY, DEFAULT_CONF_DIR))
|
||||
.c2AssetDirectory(properties.getProperty(C2NiFiProperties.C2_ASSET_DIRECTORY_KEY, C2NiFiProperties.C2_ASSET_DIRECTORY))
|
||||
.confDirectory(properties.getProperty(C2NiFiProperties.C2_CONFIG_DIRECTORY_KEY, DEFAULT_CONF_DIR))
|
||||
.runtimeManifestIdentifier(properties.getProperty(C2NiFiProperties.C2_RUNTIME_MANIFEST_IDENTIFIER_KEY, ""))
|
||||
.runtimeType(properties.getProperty(C2NiFiProperties.C2_RUNTIME_TYPE_KEY, ""))
|
||||
.c2AckUrl(properties.getProperty(C2NiFiProperties.C2_REST_URL_ACK_KEY, ""))
|
||||
|
@ -251,15 +233,4 @@ public class C2NifiClientService {
|
|||
.map(parentDir -> new File(parentDir + TARGET_CONFIG_FILE))
|
||||
.orElse(new File(DEFAULT_CONF_DIR + TARGET_CONFIG_FILE));
|
||||
}
|
||||
|
||||
private List<Path> debugBundleFiles(NiFiProperties properties) {
|
||||
return Stream.of(
|
||||
Paths.get(properties.getProperty(MINIFI_CONFIG_FILE_PATH)),
|
||||
Paths.get(properties.getProperty(MINIFI_BOOTSTRAP_FILE_PATH)),
|
||||
Paths.get(properties.getProperty(MINIFI_LOG_DIRECTORY), properties.getProperty(MINIFI_APP_LOG_FILE)),
|
||||
Paths.get(properties.getProperty(MINIFI_LOG_DIRECTORY), properties.getProperty(MINIFI_BOOTSTRAP_LOG_FILE)))
|
||||
.filter(Files::exists)
|
||||
.filter(Files::isRegularFile)
|
||||
.collect(toList());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.c2.command;
|
||||
|
||||
import static java.util.Optional.ofNullable;
|
||||
import static java.util.stream.Collectors.collectingAndThen;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static java.util.stream.Collectors.toSet;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
|
||||
public class TransferDebugCommandHelper {
|
||||
|
||||
private static final String MINIFI_CONFIG_FILE_PATH = "nifi.minifi.config.file";
|
||||
private static final String MINIFI_BOOTSTRAP_FILE_PATH = "nifi.minifi.bootstrap.file";
|
||||
private static final String MINIFI_LOG_DIRECTORY = "nifi.minifi.log.directory";
|
||||
private static final String MINIFI_APP_LOG_FILE = "nifi.minifi.app.log.file";
|
||||
private static final String MINIFI_BOOTSTRAP_LOG_FILE = "nifi.minifi.bootstrap.log.file";
|
||||
|
||||
private static final Set<String> SENSITIVE_PROPERTY_KEYWORDS =
|
||||
Stream.of("key:", "algorithm:", "secret.key", "sensitive.props.key", "sensitive.props.algorithm", "secret", "password", "passwd")
|
||||
.map(String::toLowerCase)
|
||||
.collect(collectingAndThen(toSet(), Collections::unmodifiableSet));
|
||||
|
||||
private final NiFiProperties niFiProperties;
|
||||
|
||||
public TransferDebugCommandHelper(NiFiProperties niFiProperties) {
|
||||
this.niFiProperties = niFiProperties;
|
||||
}
|
||||
|
||||
public List<Path> debugBundleFiles() {
|
||||
return Stream.of(
|
||||
Paths.get(niFiProperties.getProperty(MINIFI_CONFIG_FILE_PATH)),
|
||||
Paths.get(niFiProperties.getProperty(MINIFI_BOOTSTRAP_FILE_PATH)),
|
||||
Paths.get(niFiProperties.getProperty(MINIFI_LOG_DIRECTORY), niFiProperties.getProperty(MINIFI_APP_LOG_FILE)),
|
||||
Paths.get(niFiProperties.getProperty(MINIFI_LOG_DIRECTORY), niFiProperties.getProperty(MINIFI_BOOTSTRAP_LOG_FILE)))
|
||||
.filter(Files::exists)
|
||||
.filter(Files::isRegularFile)
|
||||
.collect(toList());
|
||||
}
|
||||
|
||||
public boolean excludeSensitiveText(String text) {
|
||||
return ofNullable(text)
|
||||
.map(String::toLowerCase)
|
||||
.map(t -> SENSITIVE_PROPERTY_KEYWORDS.stream().noneMatch(keyword -> t.contains(keyword)))
|
||||
.orElse(true);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.c2.command;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class UpdateAssetCommandHelper {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(UpdateAssetCommandHelper.class);
|
||||
|
||||
private final String assetDirectory;
|
||||
|
||||
public UpdateAssetCommandHelper(String assetDirectory) {
|
||||
this.assetDirectory = assetDirectory;
|
||||
}
|
||||
|
||||
public void createAssetDirectory() {
|
||||
try {
|
||||
Files.createDirectories(Paths.get(assetDirectory));
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Unable to create asset directory {}", assetDirectory);
|
||||
throw new UncheckedIOException("Unable to create directory", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean assetUpdatePrecondition(String assetFileName, Boolean forceDownload) {
|
||||
Path assetPath = Paths.get(assetDirectory, assetFileName);
|
||||
if (Files.exists(assetPath) && !forceDownload) {
|
||||
LOG.info("Asset file already exists on path {}. Asset won't be downloaded", assetPath);
|
||||
return false;
|
||||
}
|
||||
LOG.info("Asset file does not exist or force download is on. Asset will be downloaded to {}", assetPath);
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean assetPersistFunction(String assetFileName, byte[] assetBinary) {
|
||||
Path assetPath = Paths.get(assetDirectory, assetFileName);
|
||||
try {
|
||||
Files.deleteIfExists(assetPath);
|
||||
Files.write(assetPath, assetBinary);
|
||||
LOG.info("Asset was persisted to {}, {} bytes were written", assetPath, assetBinary.length);
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Persisting asset failed. File creation was not successful targeting {}", assetPath, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -14,17 +14,27 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.c2;
|
||||
|
||||
import static org.apache.nifi.c2.C2NifiClientService.EXCLUDE_SENSITIVE_TEXT;
|
||||
package org.apache.nifi.c2.command;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
public class C2NifiClientServiceTest {
|
||||
public class TransferDebugCommandHelperTest {
|
||||
|
||||
private TransferDebugCommandHelper transferDebugCommandHelper;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
NiFiProperties niFiProperties = new NiFiProperties();
|
||||
transferDebugCommandHelper = new TransferDebugCommandHelper(niFiProperties);
|
||||
}
|
||||
|
||||
private static Stream<Arguments> textAndExpectedResult() {
|
||||
return Stream.of(
|
||||
|
@ -51,6 +61,6 @@ public class C2NifiClientServiceTest {
|
|||
@ParameterizedTest
|
||||
@MethodSource("textAndExpectedResult")
|
||||
public void testSensitiveTextIsExcluded(String propertyName, boolean expectedResult) {
|
||||
assertEquals(expectedResult, EXCLUDE_SENSITIVE_TEXT.test(propertyName));
|
||||
assertEquals(expectedResult, transferDebugCommandHelper.excludeSensitiveText(propertyName));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,142 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.c2.command;
|
||||
|
||||
import static java.lang.Boolean.FALSE;
|
||||
import static java.lang.Boolean.TRUE;
|
||||
import static java.nio.charset.Charset.defaultCharset;
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static java.nio.file.Files.exists;
|
||||
import static java.nio.file.Files.isDirectory;
|
||||
import static java.nio.file.Files.readAllLines;
|
||||
import static java.nio.file.Files.write;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.apache.commons.lang3.StringUtils.EMPTY;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
|
||||
public class UpdateAssetCommandHelperTest {
|
||||
|
||||
private static final String ASSET_DIRECTORY = "asset_directory";
|
||||
private static final String ASSET_FILE = "asset.file";
|
||||
|
||||
@TempDir
|
||||
private File tempDir;
|
||||
|
||||
private Path assetDirectory;
|
||||
private UpdateAssetCommandHelper updateAssetCommandHelper;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
assetDirectory = Paths.get(tempDir.getAbsolutePath(), ASSET_DIRECTORY);
|
||||
updateAssetCommandHelper = new UpdateAssetCommandHelper(assetDirectory.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssetDirectoryShouldBeCreated() {
|
||||
// given + when
|
||||
updateAssetCommandHelper.createAssetDirectory();
|
||||
|
||||
// then
|
||||
assertTrue(exists(assetDirectory));
|
||||
assertTrue(isDirectory(assetDirectory));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssetFileDoesNotExist() {
|
||||
// given
|
||||
updateAssetCommandHelper.createAssetDirectory();
|
||||
|
||||
// when
|
||||
boolean result = updateAssetCommandHelper.assetUpdatePrecondition(ASSET_FILE, FALSE);
|
||||
|
||||
// then
|
||||
assertTrue(result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssetFileExistsAndForceDownloadOff() {
|
||||
// given
|
||||
updateAssetCommandHelper.createAssetDirectory();
|
||||
touchAssetFile();
|
||||
|
||||
// when
|
||||
boolean result = updateAssetCommandHelper.assetUpdatePrecondition(ASSET_FILE, FALSE);
|
||||
|
||||
// then
|
||||
assertFalse(result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssetFileExistsAndForceDownloadOn() {
|
||||
// given
|
||||
updateAssetCommandHelper.createAssetDirectory();
|
||||
touchAssetFile();
|
||||
|
||||
// when
|
||||
boolean result = updateAssetCommandHelper.assetUpdatePrecondition(ASSET_FILE, TRUE);
|
||||
|
||||
// then
|
||||
assertTrue(result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssetPersistedCorrectly() throws IOException {
|
||||
// given
|
||||
updateAssetCommandHelper.createAssetDirectory();
|
||||
String testAssetContent = "test file content";
|
||||
|
||||
// when
|
||||
boolean result = updateAssetCommandHelper.assetPersistFunction(ASSET_FILE, testAssetContent.getBytes(defaultCharset()));
|
||||
|
||||
// then
|
||||
assertTrue(result);
|
||||
assertIterableEquals(singletonList(testAssetContent), readAllLines(assetDirectory.resolve(ASSET_FILE), defaultCharset()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAssetDirectoryDoesNotExistWhenPersistingAsset() {
|
||||
// given
|
||||
String testAssetContent = "test file content";
|
||||
|
||||
// when
|
||||
boolean result = updateAssetCommandHelper.assetPersistFunction(ASSET_FILE, testAssetContent.getBytes(defaultCharset()));
|
||||
|
||||
// then
|
||||
assertFalse(result);
|
||||
assertFalse(exists(assetDirectory.resolve(ASSET_FILE)));
|
||||
}
|
||||
|
||||
private void touchAssetFile() {
|
||||
try {
|
||||
write(Paths.get(assetDirectory.toString(), ASSET_FILE), EMPTY.getBytes(UTF_8));
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException("Failed to touch file", e);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue