NIFI-10458 MiNiFi: Add C2 handler for Transfer/Debug operation

This closes #6434

Signed-off-by: Ferenc Erdei <erdei.ferenc90@gmail.com>
This commit is contained in:
Ferenc Kis 2022-09-16 17:23:09 +02:00 committed by Ferenc Erdei
parent d29f67465a
commit 16bcb8f145
No known key found for this signature in database
GPG Key ID: 023D856C60E92F96
16 changed files with 859 additions and 60 deletions

View File

@ -35,7 +35,7 @@ public interface C2Client {
Optional<C2HeartbeatResponse> publishHeartbeat(C2Heartbeat heartbeat);
/**
* Retrive the content of the new flow from the C2 Server
* Retrieve the content of the new flow from the C2 Server
*
* @param flowUpdateUrl url where the content should be downloaded from
* @return the actual downloaded content. Will be empty if no content can be downloaded
@ -48,4 +48,13 @@ public interface C2Client {
* @param operationAck the acknowledge details to be sent
*/
void acknowledgeOperation(C2OperationAck operationAck);
/**
* Uploads a binary bundle to C2 server
*
* @param callbackUrl url where the content should be uploaded to
* @param bundle bundle content as byte array to be uploaded
* @return optional error message if any issues occurred
*/
Optional<String> uploadBundle(String callbackUrl, byte[] bundle);
}

View File

@ -17,6 +17,9 @@
package org.apache.nifi.c2.client.http;
import static okhttp3.MultipartBody.FORM;
import static okhttp3.RequestBody.create;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
@ -31,9 +34,9 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.logging.HttpLoggingInterceptor;
@ -51,6 +54,9 @@ public class C2HttpClient implements C2Client {
static final MediaType MEDIA_TYPE_APPLICATION_JSON = MediaType.parse("application/json");
private static final Logger logger = LoggerFactory.getLogger(C2HttpClient.class);
private static final String MULTIPART_FORM_FILE_FIELD_NAME = "file";
private static final String BUNDLE_FILE_NAME = "debug.tar.gz";
private static final MediaType BUNDLE_MIME_TYPE = MediaType.parse("application/gzip");
private final AtomicReference<OkHttpClient> httpClientReference = new AtomicReference<>();
private final C2ClientConfig clientConfig;
@ -123,18 +129,41 @@ public class C2HttpClient implements C2Client {
@Override
public void acknowledgeOperation(C2OperationAck operationAck) {
logger.info("Acknowledging Operation [{}] C2 URL [{}]", operationAck.getOperationId(), clientConfig.getC2AckUrl());
logger.info("Acknowledging Operation {} to C2 server {}", operationAck.getOperationId(), clientConfig.getC2AckUrl());
serializer.serialize(operationAck)
.map(operationAckBody -> RequestBody.create(operationAckBody, MEDIA_TYPE_APPLICATION_JSON))
.map(operationAckBody -> create(operationAckBody, MEDIA_TYPE_APPLICATION_JSON))
.map(requestBody -> new Request.Builder().post(requestBody).url(clientConfig.getC2AckUrl()).build())
.map(C2RequestCompression.forType(clientConfig.getC2RequestCompression())::compress)
.ifPresent(this::sendAck);
}
@Override
public Optional<String> uploadBundle(String callbackUrl, byte[] bundle) {
Request request = new Request.Builder()
.url(callbackUrl)
.post(new MultipartBody.Builder()
.setType(FORM)
.addFormDataPart(MULTIPART_FORM_FILE_FIELD_NAME, BUNDLE_FILE_NAME, create(bundle, BUNDLE_MIME_TYPE))
.build())
.build();
logger.info("Uploading bundle to C2 server {} with size {}", callbackUrl, bundle.length);
try (Response response = httpClientReference.get().newCall(request).execute()) {
if (!response.isSuccessful()) {
logger.error("Upload bundle failed to C2 server {} with status code {}", callbackUrl, response.code());
return Optional.of("Upload bundle failed to C2 server");
}
} catch (IOException e) {
logger.error("Could not upload bundle to C2 server {}", callbackUrl, e);
return Optional.of("Could not upload bundle to C2 server");
}
return Optional.empty();
}
private Optional<C2HeartbeatResponse> sendHeartbeat(String heartbeat) {
Optional<C2HeartbeatResponse> c2HeartbeatResponse = Optional.empty();
Request request = new Request.Builder()
.post(RequestBody.create(heartbeat, MEDIA_TYPE_APPLICATION_JSON))
.post(create(heartbeat, MEDIA_TYPE_APPLICATION_JSON))
.url(clientConfig.getC2Url())
.build();
@ -143,7 +172,7 @@ public class C2HttpClient implements C2Client {
try (Response heartbeatResponse = httpClientReference.get().newCall(decoratedRequest).execute()) {
c2HeartbeatResponse = getResponseBody(heartbeatResponse).flatMap(response -> serializer.deserialize(response, C2HeartbeatResponse.class));
} catch (IOException ce) {
logger.error("Send Heartbeat failed [{}]", clientConfig.getC2Url(), ce);
logger.error("Send Heartbeat failed to C2 server {}", clientConfig.getC2Url(), ce);
}
return c2HeartbeatResponse;
@ -243,10 +272,10 @@ public class C2HttpClient implements C2Client {
private void sendAck(Request request) {
try (Response heartbeatResponse = httpClientReference.get().newCall(request).execute()) {
if (!heartbeatResponse.isSuccessful()) {
logger.warn("Acknowledgement was not successful with c2 server [{}] with status code {}", clientConfig.getC2AckUrl(), heartbeatResponse.code());
logger.warn("Acknowledgement was not successful with C2 server {} with status code {}", clientConfig.getC2AckUrl(), heartbeatResponse.code());
}
} catch (IOException e) {
logger.error("Could not transmit ack to c2 server [{}]", clientConfig.getC2AckUrl(), e);
logger.error("Could not transmit ack to C2 server {}", clientConfig.getC2AckUrl(), e);
}
}
}

View File

@ -43,5 +43,10 @@ limitations under the License.
<artifactId>c2-client-http</artifactId>
<version>1.18.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.service.operation;
import static java.nio.file.Files.copy;
import static java.nio.file.Files.createTempDirectory;
import static java.nio.file.Files.lines;
import static java.nio.file.Files.walk;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
import static org.apache.commons.compress.utils.IOUtils.closeQuietly;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
import org.apache.nifi.c2.protocol.api.OperandType;
import org.apache.nifi.c2.protocol.api.OperationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DebugOperationHandler implements C2OperationHandler {
private static final Logger LOG = LoggerFactory.getLogger(DebugOperationHandler.class);
private static final String C2_CALLBACK_URL_NOT_FOUND = "C2 Server callback URL was not found in request";
private static final String SUCCESSFUL_UPLOAD = "Debug bundle was uploaded successfully";
private static final String UNABLE_TO_CREATE_BUNDLE = "Unable to create debug bundle";
static final String TARGET_ARG = "target";
static final String NEW_LINE = "\n";
private final C2Client c2Client;
private final List<Path> bundleFilePaths;
private final Predicate<String> contentFilter;
private DebugOperationHandler(C2Client c2Client, List<Path> bundleFilePaths, Predicate<String> contentFilter) {
this.c2Client = c2Client;
this.bundleFilePaths = bundleFilePaths;
this.contentFilter = contentFilter;
}
public static DebugOperationHandler create(C2Client c2Client, List<Path> bundleFilePaths, Predicate<String> contentFilter) {
if (c2Client == null) {
throw new IllegalArgumentException("C2Client should not be null");
}
if (bundleFilePaths == null || bundleFilePaths.isEmpty()) {
throw new IllegalArgumentException("bundleFilePaths should not be not null or empty");
}
if (contentFilter == null) {
throw new IllegalArgumentException("Content filter should not be null");
}
return new DebugOperationHandler(c2Client, bundleFilePaths, contentFilter);
}
@Override
public OperationType getOperationType() {
return TRANSFER;
}
@Override
public OperandType getOperandType() {
return DEBUG;
}
@Override
public C2OperationAck handle(C2Operation operation) {
String debugCallbackUrl = operation.getArgs().get(TARGET_ARG);
if (debugCallbackUrl == null) {
LOG.error("Callback URL was not found in C2 request.");
return operationAck(operation, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND));
}
List<Path> contentFilteredFilePaths = null;
C2OperationState operationState;
try {
contentFilteredFilePaths = filterContent(operation.getIdentifier(), bundleFilePaths);
operationState = createDebugBundle(contentFilteredFilePaths)
.map(bundle -> c2Client.uploadBundle(debugCallbackUrl, bundle)
.map(errorMessage -> operationState(NOT_APPLIED, errorMessage))
.orElseGet(() -> operationState(FULLY_APPLIED, SUCCESSFUL_UPLOAD)))
.orElseGet(() -> operationState(NOT_APPLIED, UNABLE_TO_CREATE_BUNDLE));
} catch (Exception e) {
LOG.error("Unexpected error happened", e);
operationState = operationState(NOT_APPLIED, UNABLE_TO_CREATE_BUNDLE);
} finally {
ofNullable(contentFilteredFilePaths).ifPresent(this::cleanup);
}
LOG.debug("Returning operation ack for operation {} with state {} and details {}", operation.getIdentifier(), operationState.getState(), operationState.getDetails());
return operationAck(operation, operationState);
}
private C2OperationAck operationAck(C2Operation operation, C2OperationState state) {
C2OperationAck operationAck = new C2OperationAck();
operationAck.setOperationId(ofNullable(operation.getIdentifier()).orElse(EMPTY));
operationAck.setOperationState(state);
return operationAck;
}
private C2OperationState operationState(OperationState operationState, String details) {
C2OperationState state = new C2OperationState();
state.setState(operationState);
state.setDetails(details);
return state;
}
private List<Path> filterContent(String operationId, List<Path> bundleFilePaths) {
List<Path> contentFilteredFilePaths = new ArrayList<>();
for (Path path : bundleFilePaths) {
String fileName = path.getFileName().toString();
try (Stream<String> fileStream = lines(path)) {
Path tempDirectory = createTempDirectory(operationId);
Path tempFile = Paths.get(tempDirectory.toAbsolutePath().toString(), fileName);
Files.write(tempFile, (Iterable<String>) fileStream.filter(contentFilter)::iterator);
contentFilteredFilePaths.add(tempFile);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
return contentFilteredFilePaths;
}
private Optional<byte[]> createDebugBundle(List<Path> filePaths) {
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
try (GzipCompressorOutputStream gzipCompressorOutputStream = new GzipCompressorOutputStream(byteOutputStream);
TarArchiveOutputStream tarOutputStream = new TarArchiveOutputStream(gzipCompressorOutputStream)) {
for (Path filePath : filePaths) {
TarArchiveEntry tarArchiveEntry = new TarArchiveEntry(filePath.toFile(), filePath.getFileName().toString());
tarOutputStream.putArchiveEntry(tarArchiveEntry);
copy(filePath, tarOutputStream);
tarOutputStream.closeArchiveEntry();
}
tarOutputStream.finish();
} catch (Exception e) {
LOG.error("Error during create compressed bundle", e);
return empty();
} finally {
closeQuietly(byteOutputStream);
}
return Optional.of(byteOutputStream).map(ByteArrayOutputStream::toByteArray);
}
private void cleanup(List<Path> paths) {
paths.stream()
.findFirst()
.map(Path::getParent)
.ifPresent(basePath -> {
try (Stream<Path> walk = walk(basePath)) {
walk.map(Path::toFile).forEach(File::delete);
} catch (IOException e) {
LOG.warn("Unable to clean up temporary directory {}", basePath, e);
}
});
}
}

View File

@ -0,0 +1,251 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.client.service.operation;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.nio.file.Files.write;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.nifi.c2.client.service.operation.DebugOperationHandler.NEW_LINE;
import static org.apache.nifi.c2.client.service.operation.DebugOperationHandler.TARGET_ARG;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class DebugOperationHandlerTest {
private static final String OPERATION_ID = "operationId";
private static final String C2_DEBUG_UPLOAD_ENDPOINT = "https://host/c2/api/upload";
private static final String DEFAULT_FILE_CONTENT = "some_textual_data";
private static final List<Path> VALID_BUNDLE_FILE_LIST = singletonList(Paths.get("path_to_file"));
private static final Predicate<String> DEFAULT_CONTENT_FILTER = text -> true;
@Mock
private C2Client c2Client;
@TempDir
private File tempDir;
private static Stream<Arguments> invalidConstructorArguments() {
C2Client mockC2Client = mock(C2Client.class);
return Stream.of(
Arguments.of(null, null, null),
Arguments.of(null, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER),
Arguments.of(mockC2Client, null, DEFAULT_CONTENT_FILTER),
Arguments.of(mockC2Client, emptyList(), DEFAULT_CONTENT_FILTER),
Arguments.of(mockC2Client, VALID_BUNDLE_FILE_LIST, null)
);
}
@ParameterizedTest(name = "c2Client={0} bundleFileList={1} contentFilter={2}")
@MethodSource("invalidConstructorArguments")
public void testAttemptingCreateWithInvalidParametersWillThrowException(C2Client c2Client, List<Path> bundleFilePaths, Predicate<String> contentFilter) {
assertThrows(IllegalArgumentException.class, () -> DebugOperationHandler.create(c2Client, bundleFilePaths, contentFilter));
}
@Test
public void testOperationAndOperandTypesAreMatching() {
// given
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER);
// when + then
assertEquals(TRANSFER, testHandler.getOperationType());
assertEquals(DEBUG, testHandler.getOperandType());
}
@Test
public void testC2CallbackUrlIsNullInArgs() {
// given
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER);
C2Operation c2Operation = operation(null);
// when
C2OperationAck result = testHandler.handle(c2Operation);
// then
assertEquals(OPERATION_ID, result.getOperationId());
assertEquals(NOT_APPLIED, result.getOperationState().getState());
}
@Test
public void testFilesAreCollectedAndUploadedAsATarGzBundle() {
// given
Map<String, String> bundleFileNamesWithContents = Stream.of("file.log", "application.conf", "default.properties")
.collect(toMap(identity(), __ -> DEFAULT_FILE_CONTENT));
List<Path> createBundleFiles = bundleFileNamesWithContents.entrySet().stream()
.map(entry -> placeFileWithContent(entry.getKey(), entry.getValue()))
.collect(toList());
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, createBundleFiles, DEFAULT_CONTENT_FILTER);
C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT);
// when
C2OperationAck result = testHandler.handle(c2Operation);
// then
ArgumentCaptor<String> uploadUrlCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<byte[]> uploadBundleCaptor = ArgumentCaptor.forClass(byte[].class);
verify(c2Client).uploadBundle(uploadUrlCaptor.capture(), uploadBundleCaptor.capture());
assertEquals(OPERATION_ID, result.getOperationId());
assertEquals(FULLY_APPLIED, result.getOperationState().getState());
assertEquals(C2_DEBUG_UPLOAD_ENDPOINT, uploadUrlCaptor.getValue());
Map<String, String> resultBundle = extractBundle(uploadBundleCaptor.getValue());
assertTrue(mapEqual(bundleFileNamesWithContents, resultBundle));
}
@Test
public void testFileToCollectDoesNotExist() {
// given
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, singletonList(Paths.get(tempDir.getAbsolutePath(), "missing_file")), DEFAULT_CONTENT_FILTER);
C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT);
// when
C2OperationAck result = testHandler.handle(c2Operation);
// then
assertEquals(OPERATION_ID, result.getOperationId());
assertEquals(NOT_APPLIED, result.getOperationState().getState());
}
private static Stream<Arguments> contentFilterArguments() {
String filterKeyword = "minifi";
return Stream.of(
Arguments.of(
"files_containing_keyword_filtered_out.file",
filterKeyword,
Stream.of("line one", "line two " + filterKeyword, filterKeyword + "line three", "line four", "line " + filterKeyword + " five").collect(joining(NEW_LINE)),
Stream.of("line one", "line four").collect(joining(NEW_LINE))),
Arguments.of(
"all_content_filtered_out.file",
filterKeyword,
Stream.of("line one " + filterKeyword, filterKeyword, filterKeyword + "line three",
filterKeyword + "line four" + filterKeyword, "line " + filterKeyword + " five").collect(joining(NEW_LINE)),
""),
Arguments.of(
"all_content_kept.file",
filterKeyword,
Stream.of("line one", "line two", "line three", "line four", "line five").collect(joining(NEW_LINE)),
Stream.of("line one", "line two", "line three", "line four", "line five").collect(joining(NEW_LINE)))
);
}
@ParameterizedTest(name = "file={0}")
@MethodSource("contentFilterArguments")
public void testContentIsFilteredOut(String fileName, String filterKeyword, String inputContent, String expectedContent) {
// given
Path bundleFile = placeFileWithContent(fileName, inputContent);
Predicate<String> testContentFilter = content -> !content.contains(filterKeyword);
DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, singletonList(bundleFile), testContentFilter);
C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT);
// when
C2OperationAck result = testHandler.handle(c2Operation);
// then
ArgumentCaptor<String> uploadUrlCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<byte[]> uploadBundleCaptor = ArgumentCaptor.forClass(byte[].class);
verify(c2Client).uploadBundle(uploadUrlCaptor.capture(), uploadBundleCaptor.capture());
assertEquals(OPERATION_ID, result.getOperationId());
assertEquals(FULLY_APPLIED, result.getOperationState().getState());
assertEquals(C2_DEBUG_UPLOAD_ENDPOINT, uploadUrlCaptor.getValue());
Map<String, String> resultBundle = extractBundle(uploadBundleCaptor.getValue());
assertEquals(1, resultBundle.size());
assertEquals(expectedContent, resultBundle.get(fileName));
}
private Path placeFileWithContent(String fileName, String content) {
Path filePath = Paths.get(tempDir.getAbsolutePath(), fileName);
try {
write(filePath, content.getBytes(UTF_8));
} catch (IOException e) {
throw new UncheckedIOException("Failed to write file to temp directory", e);
}
return filePath;
}
private C2Operation operation(String uploadUrl) {
C2Operation c2Operation = new C2Operation();
c2Operation.setIdentifier(OPERATION_ID);
c2Operation.setArgs(singletonMap(TARGET_ARG, uploadUrl));
return c2Operation;
}
private Map<String, String> extractBundle(byte[] bundle) {
Map<String, String> fileNamesWithContents = new HashMap<>();
try (TarArchiveInputStream tarInputStream = new TarArchiveInputStream(new GzipCompressorInputStream(new ByteArrayInputStream(bundle)))) {
TarArchiveEntry currentEntry;
while ((currentEntry = tarInputStream.getNextTarEntry()) != null) {
fileNamesWithContents.put(
currentEntry.getName(),
new BufferedReader(new InputStreamReader(tarInputStream)).lines().collect(joining(NEW_LINE)));
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to extract bundle", e);
}
return fileNamesWithContents;
}
private boolean mapEqual(Map<String, String> first, Map<String, String> second) {
if (first.size() != second.size()) {
return false;
}
return first.entrySet()
.stream()
.allMatch(e -> e.getValue().equals(second.get(e.getKey())));
}
}

View File

@ -31,8 +31,19 @@ import org.apache.nifi.bootstrap.util.OSUtils;
public class MiNiFiExecCommandProvider {
public static final String LOG_DIR = "org.apache.nifi.minifi.bootstrap.config.log.dir";
public static final String DEFAULT_LOG_DIR = "./logs";
public static final String APP_LOG_FILE_NAME = "org.apache.nifi.minifi.bootstrap.config.log.app.file.name";
public static final String APP_LOG_FILE_EXTENSION = "org.apache.nifi.minifi.bootstrap.config.log.app.file.extension";
public static final String BOOTSTRAP_LOG_FILE_NAME = "org.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.name";
public static final String BOOTSTRAP_LOG_FILE_EXTENSION = "org.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.extension";
public static final String DEFAULT_APP_LOG_FILE_NAME = "minifi-app";
public static final String DEFAULT_BOOTSTRAP_LOG_FILE_NAME = "minifi-bootstrap";
public static final String DEFAULT_LOG_FILE_EXTENSION = "log";
private static final String DEFAULT_JAVA_CMD = "java";
private static final String DEFAULT_LOG_DIR = "./logs";
private static final String DEFAULT_LIB_DIR = "./lib";
private static final String DEFAULT_CONF_DIR = "./conf";
private static final String DEFAULT_CONFIG_FILE = DEFAULT_CONF_DIR + "/bootstrap.conf";
@ -56,7 +67,11 @@ public class MiNiFiExecCommandProvider {
Properties props = bootstrapFileProvider.getBootstrapProperties();
File confDir = getFile(props.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir);
File libDir = getFile(props.getProperty("lib.dir", DEFAULT_LIB_DIR).trim(), workingDir);
String minifiLogDir = System.getProperty("org.apache.nifi.minifi.bootstrap.config.log.dir", DEFAULT_LOG_DIR).trim();
String minifiLogDir = System.getProperty(LOG_DIR, DEFAULT_LOG_DIR).trim();
String minifiAppLogFileName = System.getProperty(APP_LOG_FILE_NAME, DEFAULT_APP_LOG_FILE_NAME).trim();
String minifiAppLogFileExtension = System.getProperty(APP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
String minifiBootstrapLogFileName = System.getProperty(BOOTSTRAP_LOG_FILE_NAME, DEFAULT_BOOTSTRAP_LOG_FILE_NAME).trim();
String minifiBootstrapLogFileExtension = System.getProperty(BOOTSTRAP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
List<String> cmd = new ArrayList<>();
cmd.add(getJavaCommand(props));
@ -66,7 +81,11 @@ public class MiNiFiExecCommandProvider {
cmd.add("-Dnifi.properties.file.path=" + getMiNiFiPropsFileName(props, confDir));
cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
cmd.add("-Dapp=MiNiFi");
cmd.add("-Dorg.apache.nifi.minifi.bootstrap.config.log.dir=" + minifiLogDir);
cmd.add("-D" + LOG_DIR + "=" + minifiLogDir);
cmd.add("-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName);
cmd.add("-D" + APP_LOG_FILE_EXTENSION + "=" + minifiAppLogFileExtension);
cmd.add("-D" + BOOTSTRAP_LOG_FILE_NAME + "=" + minifiBootstrapLogFileName);
cmd.add("-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" + minifiBootstrapLogFileExtension);
cmd.add("org.apache.nifi.minifi.MiNiFi");
return cmd;

View File

@ -19,6 +19,15 @@ package org.apache.nifi.minifi.bootstrap.util;
import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.OVERRIDE_SECURITY;
import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.APP_LOG_FILE_EXTENSION;
import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.APP_LOG_FILE_NAME;
import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.BOOTSTRAP_LOG_FILE_EXTENSION;
import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.BOOTSTRAP_LOG_FILE_NAME;
import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.DEFAULT_APP_LOG_FILE_NAME;
import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.DEFAULT_BOOTSTRAP_LOG_FILE_NAME;
import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.DEFAULT_LOG_DIR;
import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.DEFAULT_LOG_FILE_EXTENSION;
import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.LOG_DIR;
import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
@ -47,8 +56,10 @@ import javax.xml.transform.stream.StreamResult;
import org.apache.commons.io.input.TeeInputStream;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
import org.apache.nifi.minifi.bootstrap.exception.InvalidConfigurationException;
import org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider;
import org.apache.nifi.minifi.commons.schema.ComponentStatusRepositorySchema;
import org.apache.nifi.minifi.commons.schema.ConfigSchema;
import org.apache.nifi.minifi.commons.schema.ConnectionSchema;
@ -84,13 +95,20 @@ import org.w3c.dom.Document;
import org.w3c.dom.Element;
public final class ConfigTransformer {
// Underlying version of NIFI will be using
public static final String ROOT_GROUP = "Root-Group";
static final String MINIFI_CONFIG_FILE_PATH = "nifi.minifi.config.file";
static final String MINIFI_BOOTSTRAP_FILE_PATH = "nifi.minifi.bootstrap.file";
static final String MINIFI_LOG_DIRECTORY = "nifi.minifi.log.directory";
static final String MINIFI_APP_LOG_FILE = "nifi.minifi.app.log.file";
static final String MINIFI_BOOTSTRAP_LOG_FILE = "nifi.minifi.bootstrap.log.file";
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigTransformer.class);
private static final String OVERRIDE_CORE_PROPERTIES_KEY = PULL_HTTP_BASE_KEY + ".override.core";
private static final Base64.Encoder KEY_ENCODER = Base64.getEncoder().withoutPadding();
private static final int SENSITIVE_PROPERTIES_KEY_LENGTH = 24;
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigTransformer.class);
// Underlying version of NIFI will be using
public static final String ROOT_GROUP = "Root-Group";
// Final util classes should have private constructor
private ConfigTransformer() {
@ -147,6 +165,17 @@ public final class ConfigTransformer {
.filter(entry -> ((String) entry.getKey()).startsWith("c2"))
.forEach(entry -> configSchemaNew.getNifiPropertiesOverrides().putIfAbsent((String) entry.getKey(), (String) entry.getValue()));
// Config files and log files
if (bootstrapProperties != null) {
configSchemaNew.getNifiPropertiesOverrides().putIfAbsent(MINIFI_CONFIG_FILE_PATH, bootstrapProperties.getProperty(RunMiNiFi.MINIFI_CONFIG_FILE_KEY));
}
configSchemaNew.getNifiPropertiesOverrides().putIfAbsent(MINIFI_BOOTSTRAP_FILE_PATH, BootstrapFileProvider.getBootstrapConfFile().getAbsolutePath());
configSchemaNew.getNifiPropertiesOverrides().putIfAbsent(MINIFI_LOG_DIRECTORY, System.getProperty(LOG_DIR, DEFAULT_LOG_DIR).trim());
configSchemaNew.getNifiPropertiesOverrides().putIfAbsent(MINIFI_APP_LOG_FILE,
System.getProperty(APP_LOG_FILE_NAME, DEFAULT_APP_LOG_FILE_NAME).trim() + "." + System.getProperty(APP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim());
configSchemaNew.getNifiPropertiesOverrides().putIfAbsent(MINIFI_BOOTSTRAP_LOG_FILE,
System.getProperty(BOOTSTRAP_LOG_FILE_NAME, DEFAULT_BOOTSTRAP_LOG_FILE_NAME).trim() + "." + System.getProperty(BOOTSTRAP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim());
// Create nifi.properties and flow.xml.gz in memory
ByteArrayOutputStream nifiPropertiesOutputStream = new ByteArrayOutputStream();
writeNiFiProperties(configSchemaNew, nifiPropertiesOutputStream);

View File

@ -65,6 +65,11 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.MINIFI_APP_LOG_FILE;
import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.MINIFI_BOOTSTRAP_FILE_PATH;
import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.MINIFI_BOOTSTRAP_LOG_FILE;
import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.MINIFI_CONFIG_FILE_PATH;
import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.MINIFI_LOG_DIRECTORY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -491,6 +496,28 @@ public class ConfigTransformerTest {
}
@Test
public void checkLogAndConfigFilePathOverrides() throws Exception {
File inputFile = new File("./src/test/resources/config-minimal.yml");
Properties bootstrapProperties = getTestBootstrapProperties("bootstrap.conf");
ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", bootstrapProperties);
File nifiPropertiesFile = new File("./target/nifi.properties");
assertTrue(nifiPropertiesFile.exists());
assertTrue(nifiPropertiesFile.canRead());
Properties nifiProperties = new Properties();
nifiProperties.load(new FileInputStream(nifiPropertiesFile));
// an absolute path is returned for bootstrap which is different per every environment. only testing that path ends with expected relative path
assertTrue(nifiProperties.getProperty(MINIFI_BOOTSTRAP_FILE_PATH).endsWith("bootstrap.conf"));
assertEquals("./conf/config.yml", nifiProperties.getProperty(MINIFI_CONFIG_FILE_PATH));
assertEquals("./logs", nifiProperties.getProperty(MINIFI_LOG_DIRECTORY));
assertEquals("minifi-app.log", nifiProperties.getProperty(MINIFI_APP_LOG_FILE));
assertEquals("minifi-bootstrap.log", nifiProperties.getProperty(MINIFI_BOOTSTRAP_LOG_FILE));
}
public void testConfigFileTransform(String configFile) throws Exception {
ConfigSchema configSchema = SchemaLoader.loadConfigSchemaFromYaml(ConfigTransformerTest.class.getClassLoader().getResourceAsStream(configFile));

View File

@ -0,0 +1,131 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Java command to use when running MiNiFi
java=java
# Username to use when running MiNiFi. This value will be ignored on Windows.
run.as=
# Configure where MiNiFi's lib and conf directories live
# When running as a Windows service set full paths instead of relative paths
lib.dir=./lib
conf.dir=./conf
# How long to wait after telling MiNiFi to shutdown before explicitly killing the Process
graceful.shutdown.seconds=20
# The location for the configuration file
# When running as a Windows service use the full path to the file
nifi.minifi.config=./conf/config.yml
# Security Properties #
# These properties take precedence over any equivalent properties specified in config.yml #
nifi.minifi.security.keystore=
nifi.minifi.security.keystoreType=jks
nifi.minifi.security.keystorePasswd=
nifi.minifi.security.keyPasswd=
nifi.minifi.security.truststore=
nifi.minifi.security.truststoreType=
nifi.minifi.security.truststorePasswd=
nifi.minifi.security.ssl.protocol=
nifi.minifi.sensitive.props.key=
nifi.minifi.sensitive.props.algorithm=
# Provenance Reporting Properties #
# These properties take precedence over any equivalent properties specified in the config.yml #
nifi.minifi.provenance.reporting.comment=
nifi.minifi.provenance.reporting.scheduling.strategy=
nifi.minifi.provenance.reporting.scheduling.period=
nifi.minifi.provenance.reporting.destination.url=
nifi.minifi.provenance.reporting.input.port.name=
nifi.minifi.provenance.reporting.instance.url=
nifi.minifi.provenance.reporting.batch.size=
nifi.minifi.provenance.reporting.communications.timeout=
# Ignore custom SSL controller services and use parent minifi SSL
nifi.minifi.flow.use.parent.ssl=true
# Notifiers to use for the associated agent, comma separated list of class names
#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.FileChangeIngestor
#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.RestChangeIngestor
#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor
# File change notifier configuration
# Path of the file to monitor for changes. When these occur, the FileChangeNotifier, if configured, will begin the configuration reloading process
#nifi.minifi.notifier.ingestors.file.config.path=
# How frequently the file specified by 'nifi.minifi.notifier.file.config.path' should be evaluated for changes.
#nifi.minifi.notifier.ingestors.file.polling.period.seconds=5
# Rest change notifier configuration
# Port on which the Jetty server will bind to, keep commented for a random open port
#nifi.minifi.notifier.ingestors.receive.http.port=8338
#Pull HTTP change notifier configuration
# Hostname on which to pull configurations from
#nifi.minifi.notifier.ingestors.pull.http.hostname=localhost
# Port on which to pull configurations from
#nifi.minifi.notifier.ingestors.pull.http.port=4567
# Path to pull configurations from
#nifi.minifi.notifier.ingestors.pull.http.path=/c2/config
# Query string to pull configurations with
#nifi.minifi.notifier.ingestors.pull.http.query=class=raspi3
# Period on which to pull configurations from, defaults to 5 minutes if commented out
#nifi.minifi.notifier.ingestors.pull.http.period.ms=300000
# Periodic Status Reporters to use for the associated agent, comma separated list of class names
#nifi.minifi.status.reporter.components=org.apache.nifi.minifi.bootstrap.status.reporters.StatusLogger
# Periodic Status Logger configuration
# The FlowStatus query to submit to the MiNiFi instance
#nifi.minifi.status.reporter.log.query=instance:health,bulletins
# The log level at which the status will be logged
#nifi.minifi.status.reporter.log.level=INFO
# The period (in milliseconds) at which to log the status
#nifi.minifi.status.reporter.log.period=60000
# Disable JSR 199 so that we can use JSP's without running a JDK
java.arg.1=-Dorg.apache.jasper.compiler.disablejsr199=true
# JVM memory settings
java.arg.2=-Xms256m
java.arg.3=-Xmx256m
# Enable Remote Debugging
#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000
java.arg.4=-Djava.net.preferIPv4Stack=true
# allowRestrictedHeaders is required for Cluster/Node communications to work properly
java.arg.5=-Dsun.net.http.allowRestrictedHeaders=true
java.arg.6=-Djava.protocol.handler.pkgs=sun.net.www.protocol
# Sets the provider of SecureRandom to /dev/urandom to prevent blocking on VMs
java.arg.7=-Djava.security.egd=file:/dev/urandom
# The G1GC is still considered experimental but has proven to be very advantageous in providing great
# performance without significant "stop-the-world" delays.
#java.arg.13=-XX:+UseG1GC
#Set headless mode by default
java.arg.14=-Djava.awt.headless=true

View File

@ -26,4 +26,8 @@ rem The directory for the NiFi pid file
set MINIFI_PID_DIR=%MINIFI_ROOT%\run
rem The directory for NiFi log files
set MINIFI_LOG_DIR=%MINIFI_ROOT%\logs
set MINIFI_LOG_DIR=%MINIFI_ROOT%\logs
set MINIFI_APP_LOG_FILE_NAME="minifi-app"
set MINIFI_APP_LOG_FILE_EXTENSION="log"
set MINIFI_BOOTSTRAP_LOG_FILE_NAME="minifi-bootstrap"
set MINIFI_BOOTSTRAP_LOG_FILE_EXTENSION="log"

View File

@ -21,8 +21,14 @@
export MINIFI_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd)
#The directory for the NiFi pid file
#The directory for the MiNiFi pid file
export MINIFI_PID_DIR="${MINIFI_HOME}/run"
#The directory for NiFi log files
export MINIFI_LOG_DIR="${MINIFI_HOME}/logs"
#The directory for MiNiFi log files
export MINIFI_LOG_DIR="${MINIFI_HOME}/logs"
# MiNiFi log file names and extensions
export MINIFI_APP_LOG_FILE_NAME=minifi-app
export MINIFI_APP_LOG_FILE_EXTENSION=log
export MINIFI_BOOTSTRAP_LOG_FILE_NAME=minifi-bootstrap
export MINIFI_BOOTSTRAP_LOG_FILE_EXTENSION=log

View File

@ -303,7 +303,7 @@ run() {
#setup directory parameters
BOOTSTRAP_LOG_PARAMS="-Dorg.apache.nifi.minifi.bootstrap.config.log.dir="\""${MINIFI_LOG_DIR}"\"""
BOOTSTRAP_LOG_PARAMS="-Dorg.apache.nifi.minifi.bootstrap.config.log.dir="\""${MINIFI_LOG_DIR}"\"" -Dorg.apache.nifi.minifi.bootstrap.config.log.app.file.name="\""${MINIFI_APP_LOG_FILE_NAME}"\"" -Dorg.apache.nifi.minifi.bootstrap.config.log.app.file.extension="\""${MINIFI_APP_LOG_FILE_EXTENSION}"\"" -Dorg.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.name="\""${MINIFI_BOOTSTRAP_LOG_FILE_NAME}"\"" -Dorg.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.extension="\""${MINIFI_BOOTSTRAP_LOG_FILE_EXTENSION}"\"""
BOOTSTRAP_PID_PARAMS="-Dorg.apache.nifi.minifi.bootstrap.config.pid.dir="\""${MINIFI_PID_DIR}"\"""
BOOTSTRAP_CONF_PARAMS="-Dorg.apache.nifi.minifi.bootstrap.config.file="\""${BOOTSTRAP_CONF}"\"""

View File

@ -42,7 +42,7 @@ set LIB_DIR=lib
set CONF_DIR=conf
set BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
set JAVA_ARGS=-Dorg.apache.nifi.minifi.bootstrap.config.log.dir=%MINIFI_LOG_DIR% -Dorg.apache.nifi.minifi.bootstrap.config.pid.dir=%MINIFI_PID_DIR% -Dorg.apache.nifi.minifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
set JAVA_ARGS=-Dorg.apache.nifi.minifi.bootstrap.config.log.dir=%MINIFI_LOG_DIR% -Dorg.apache.nifi.minifi.bootstrap.config.log.app.file.name=%MINIFI_APP_LOG_FILE_NAME% -Dorg.apache.nifi.minifi.bootstrap.config.log.app.file.extension=%MINIFI_APP_LOG_FILE_EXTENSION% -Dorg.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.name=%MINIFI_BOOTSTRAP_LOG_FILE_NAME% -Dorg.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.extension=%MINIFI_BOOTSTRAP_LOG_FILE_EXTENSION% -Dorg.apache.nifi.minifi.bootstrap.config.pid.dir=%MINIFI_PID_DIR% -Dorg.apache.nifi.minifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
set JAVA_PARAMS=-cp %CONF_DIR%;%BOOTSTRAP_LIB_DIR%\*;%LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.minifi.bootstrap.RunMiNiFi
set BOOTSTRAP_ACTION=run

View File

@ -20,7 +20,7 @@
</contextListener>
<appender name="APP_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${org.apache.nifi.minifi.bootstrap.config.log.dir}/minifi-app.log</file>
<file>${org.apache.nifi.minifi.bootstrap.config.log.dir}/${org.apache.nifi.minifi.bootstrap.config.log.app.file.name}.${org.apache.nifi.minifi.bootstrap.config.log.app.file.extension}</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<!--
For daily rollover, use 'app_%d.log'.
@ -28,7 +28,7 @@
To GZIP rolled files, replace '.log' with '.log.gz'.
To ZIP rolled files, replace '.log' with '.log.zip'.
-->
<fileNamePattern>${org.apache.nifi.minifi.bootstrap.config.log.dir}/minifi-app_%d{yyyy-MM-dd_HH}.%i.log.gz</fileNamePattern>
<fileNamePattern>${org.apache.nifi.minifi.bootstrap.config.log.dir}/${org.apache.nifi.minifi.bootstrap.config.log.app.file.name}_%d{yyyy-MM-dd_HH}.%i.${org.apache.nifi.minifi.bootstrap.config.log.app.file.extension}.gz</fileNamePattern>
<!-- Keep 10 rolling periods worth of log files-->
<maxHistory>10</maxHistory>
<!-- Max size each log file will be-->
@ -43,7 +43,7 @@
</appender>
<appender name="BOOTSTRAP_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${org.apache.nifi.minifi.bootstrap.config.log.dir}/minifi-bootstrap.log</file>
<file>${org.apache.nifi.minifi.bootstrap.config.log.dir}/${org.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.name}.${org.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.extension}</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!--
For daily rollover, use 'bootstrap_%d.log'.
@ -51,7 +51,7 @@
To GZIP rolled files, replace '.log' with '.log.gz'.
To ZIP rolled files, replace '.log' with '.log.zip'.
-->
<fileNamePattern>${org.apache.nifi.minifi.bootstrap.config.log.dir}/minifi-bootstrap_%d.log.gz</fileNamePattern>
<fileNamePattern>${org.apache.nifi.minifi.bootstrap.config.log.dir}/${org.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.name}_%d.${org.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.extension}.gz</fileNamePattern>
<!-- Keep 5 rolling periods worth of logs-->
<maxHistory>5</maxHistory>
</rollingPolicy>

View File

@ -14,15 +14,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2;
import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Optional;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.nifi.c2.client.C2ClientConfig;
import org.apache.nifi.c2.client.http.C2HttpClient;
import org.apache.nifi.c2.client.service.C2ClientService;
@ -30,13 +47,14 @@ import org.apache.nifi.c2.client.service.C2HeartbeatFactory;
import org.apache.nifi.c2.client.service.FlowIdHolder;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
import org.apache.nifi.c2.client.service.operation.C2OperationService;
import org.apache.nifi.c2.client.service.operation.DebugOperationHandler;
import org.apache.nifi.c2.client.service.operation.DescribeManifestOperationHandler;
import org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler;
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.serializer.C2JacksonSerializer;
import org.apache.nifi.controller.FlowController;;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.diagnostics.StorageUsage;
@ -51,16 +69,26 @@ import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class C2NifiClientService {
public static final Set<String> SENSITIVE_PROPERTY_KEYWORDS =
Stream.of("key:", "algorithm:", "secret.key", "sensitive.props.key", "sensitive.props.algorithm", "secret", "password", "passwd")
.map(String::toLowerCase)
.collect(collectingAndThen(toSet(), Collections::unmodifiableSet));
public static final Predicate<String> EXCLUDE_SENSITIVE_TEXT = text ->
ofNullable(text)
.map(String::toLowerCase)
.map(t -> SENSITIVE_PROPERTY_KEYWORDS.stream().noneMatch(keyword -> t.contains(keyword)))
.orElse(true);
private static final String MINIFI_CONFIG_FILE_PATH = "nifi.minifi.config.file";
private static final String MINIFI_BOOTSTRAP_FILE_PATH = "nifi.minifi.bootstrap.file";
private static final String MINIFI_LOG_DIRECTORY = "nifi.minifi.log.directory";
private static final String MINIFI_APP_LOG_FILE = "nifi.minifi.app.log.file";
private static final String MINIFI_BOOTSTRAP_LOG_FILE = "nifi.minifi.bootstrap.log.file";
private static final Logger logger = LoggerFactory.getLogger(C2NifiClientService.class);
private static final String DEFAULT_CONF_DIR = "./conf";
private static final String TARGET_CONFIG_FILE = "/config-new.yml";
private static final String ROOT_GROUP_ID = "root";
@ -96,37 +124,38 @@ public class C2NifiClientService {
heartbeatFactory,
new C2OperationService(Arrays.asList(
new UpdateConfigurationOperationHandler(client, flowIdHolder, this::updateFlowContent),
new DescribeManifestOperationHandler(heartbeatFactory, this::generateRuntimeInfo)
new DescribeManifestOperationHandler(heartbeatFactory, this::generateRuntimeInfo),
DebugOperationHandler.create(client, debugBundleFiles(niFiProperties), EXCLUDE_SENSITIVE_TEXT)
))
);
}
private C2ClientConfig generateClientConfig(NiFiProperties properties) {
return new C2ClientConfig.Builder()
.agentClass(properties.getProperty(C2NiFiProperties.C2_AGENT_CLASS_KEY, ""))
.agentIdentifier(properties.getProperty(C2NiFiProperties.C2_AGENT_IDENTIFIER_KEY))
.fullHeartbeat(Boolean.parseBoolean(properties.getProperty(C2NiFiProperties.C2_FULL_HEARTBEAT_KEY, "true")))
.heartbeatPeriod(Long.parseLong(properties.getProperty(C2NiFiProperties.C2_AGENT_HEARTBEAT_PERIOD_KEY,
String.valueOf(C2NiFiProperties.C2_AGENT_DEFAULT_HEARTBEAT_PERIOD))))
.connectTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_CONNECTION_TIMEOUT,
C2NiFiProperties.C2_DEFAULT_CONNECTION_TIMEOUT), TimeUnit.MILLISECONDS))
.readTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_READ_TIMEOUT,
C2NiFiProperties.C2_DEFAULT_READ_TIMEOUT), TimeUnit.MILLISECONDS))
.callTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_CALL_TIMEOUT,
C2NiFiProperties.C2_DEFAULT_CALL_TIMEOUT), TimeUnit.MILLISECONDS))
.c2Url(properties.getProperty(C2NiFiProperties.C2_REST_URL_KEY, ""))
.c2RequestCompression(properties.getProperty(C2NiFiProperties.C2_REQUEST_COMPRESSION_KEY, C2NiFiProperties.C2_REQUEST_COMPRESSION))
.agentClass(properties.getProperty(C2NiFiProperties.C2_AGENT_CLASS_KEY, ""))
.agentIdentifier(properties.getProperty(C2NiFiProperties.C2_AGENT_IDENTIFIER_KEY))
.fullHeartbeat(Boolean.parseBoolean(properties.getProperty(C2NiFiProperties.C2_FULL_HEARTBEAT_KEY, "true")))
.heartbeatPeriod(Long.parseLong(properties.getProperty(C2NiFiProperties.C2_AGENT_HEARTBEAT_PERIOD_KEY,
String.valueOf(C2NiFiProperties.C2_AGENT_DEFAULT_HEARTBEAT_PERIOD))))
.connectTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_CONNECTION_TIMEOUT,
C2NiFiProperties.C2_DEFAULT_CONNECTION_TIMEOUT), TimeUnit.MILLISECONDS))
.readTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_READ_TIMEOUT,
C2NiFiProperties.C2_DEFAULT_READ_TIMEOUT), TimeUnit.MILLISECONDS))
.callTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_CALL_TIMEOUT,
C2NiFiProperties.C2_DEFAULT_CALL_TIMEOUT), TimeUnit.MILLISECONDS))
.c2Url(properties.getProperty(C2NiFiProperties.C2_REST_URL_KEY, ""))
.c2RequestCompression(properties.getProperty(C2NiFiProperties.C2_REQUEST_COMPRESSION_KEY, C2NiFiProperties.C2_REQUEST_COMPRESSION))
.confDirectory(properties.getProperty(C2NiFiProperties.C2_CONFIG_DIRECTORY_KEY, DEFAULT_CONF_DIR))
.runtimeManifestIdentifier(properties.getProperty(C2NiFiProperties.C2_RUNTIME_MANIFEST_IDENTIFIER_KEY, ""))
.runtimeType(properties.getProperty(C2NiFiProperties.C2_RUNTIME_TYPE_KEY, ""))
.c2AckUrl(properties.getProperty(C2NiFiProperties.C2_REST_URL_ACK_KEY, ""))
.truststoreFilename(properties.getProperty(C2NiFiProperties.TRUSTSTORE_LOCATION_KEY, ""))
.truststorePassword(properties.getProperty(C2NiFiProperties.TRUSTSTORE_PASSWORD_KEY, ""))
.truststoreType(properties.getProperty(C2NiFiProperties.TRUSTSTORE_TYPE_KEY, "JKS"))
.keystoreFilename(properties.getProperty(C2NiFiProperties.KEYSTORE_LOCATION_KEY, ""))
.keystorePassword(properties.getProperty(C2NiFiProperties.KEYSTORE_PASSWORD_KEY, ""))
.keystoreType(properties.getProperty(C2NiFiProperties.KEYSTORE_TYPE_KEY, "JKS"))
.build();
.runtimeManifestIdentifier(properties.getProperty(C2NiFiProperties.C2_RUNTIME_MANIFEST_IDENTIFIER_KEY, ""))
.runtimeType(properties.getProperty(C2NiFiProperties.C2_RUNTIME_TYPE_KEY, ""))
.c2AckUrl(properties.getProperty(C2NiFiProperties.C2_REST_URL_ACK_KEY, ""))
.truststoreFilename(properties.getProperty(C2NiFiProperties.TRUSTSTORE_LOCATION_KEY, ""))
.truststorePassword(properties.getProperty(C2NiFiProperties.TRUSTSTORE_PASSWORD_KEY, ""))
.truststoreType(properties.getProperty(C2NiFiProperties.TRUSTSTORE_TYPE_KEY, "JKS"))
.keystoreFilename(properties.getProperty(C2NiFiProperties.KEYSTORE_LOCATION_KEY, ""))
.keystorePassword(properties.getProperty(C2NiFiProperties.KEYSTORE_PASSWORD_KEY, ""))
.keystoreType(properties.getProperty(C2NiFiProperties.KEYSTORE_TYPE_KEY, "JKS"))
.build();
}
public void start() {
@ -209,10 +238,21 @@ public class C2NifiClientService {
}
private File getTargetConfigFile() {
return Optional.ofNullable(propertiesDir)
return ofNullable(propertiesDir)
.map(File::new)
.map(File::getParent)
.map(parentDir -> new File(parentDir + TARGET_CONFIG_FILE))
.orElse( new File(DEFAULT_CONF_DIR + TARGET_CONFIG_FILE));
.orElse(new File(DEFAULT_CONF_DIR + TARGET_CONFIG_FILE));
}
private List<Path> debugBundleFiles(NiFiProperties properties) {
return Stream.of(
Paths.get(properties.getProperty(MINIFI_CONFIG_FILE_PATH)),
Paths.get(properties.getProperty(MINIFI_BOOTSTRAP_FILE_PATH)),
Paths.get(properties.getProperty(MINIFI_LOG_DIRECTORY), properties.getProperty(MINIFI_APP_LOG_FILE)),
Paths.get(properties.getProperty(MINIFI_LOG_DIRECTORY), properties.getProperty(MINIFI_BOOTSTRAP_LOG_FILE)))
.filter(Files::exists)
.filter(Files::isRegularFile)
.collect(toList());
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2;
import static org.apache.nifi.c2.C2NifiClientService.EXCLUDE_SENSITIVE_TEXT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.stream.Stream;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
public class C2NifiClientServiceTest {
private static Stream<Arguments> textAndExpectedResult() {
return Stream.of(
Arguments.of("nifi.minifi.sensitive.props.key", false),
Arguments.of("nifi.minifi.sensitive.props.algorithm", false),
Arguments.of("key:", false),
Arguments.of("algorithm:", false),
Arguments.of("secret.key", false),
Arguments.of("c2.security.truststore.password", false),
Arguments.of("c2.security.keystore.password", false),
Arguments.of("nifi.minifi.security.keystorePasswd", false),
Arguments.of("nifi.minifi.security.truststorePasswd", false),
Arguments.of("nifi.minifi.security.keystore", true),
Arguments.of("nifi.minifi.security.truststore", true),
Arguments.of("nifi.minifi.flow.use.parent.ssl", true),
Arguments.of("nifi.minifi.status.reporter", true),
Arguments.of("nifi.minifi.security.ssl.protocol", true),
Arguments.of("", true),
Arguments.of(null, true),
Arguments.of("nifi", true)
);
}
@ParameterizedTest
@MethodSource("textAndExpectedResult")
public void testSensitiveTextIsExcluded(String propertyName, boolean expectedResult) {
assertEquals(expectedResult, EXCLUDE_SENSITIVE_TEXT.test(propertyName));
}
}