diff --git a/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java index e9f61e1e94..c42b24a3b1 100644 --- a/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java +++ b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java @@ -35,7 +35,7 @@ public interface C2Client { Optional publishHeartbeat(C2Heartbeat heartbeat); /** - * Retrive the content of the new flow from the C2 Server + * Retrieve the content of the new flow from the C2 Server * * @param flowUpdateUrl url where the content should be downloaded from * @return the actual downloaded content. Will be empty if no content can be downloaded @@ -48,4 +48,13 @@ public interface C2Client { * @param operationAck the acknowledge details to be sent */ void acknowledgeOperation(C2OperationAck operationAck); + + /** + * Uploads a binary bundle to C2 server + * + * @param callbackUrl url where the content should be uploaded to + * @param bundle bundle content as byte array to be uploaded + * @return optional error message if any issues occurred + */ + Optional uploadBundle(String callbackUrl, byte[] bundle); } diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java index e9871276a8..bf9c083a0a 100644 --- a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java +++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java @@ -17,6 +17,9 @@ package org.apache.nifi.c2.client.http; +import static okhttp3.MultipartBody.FORM; +import static okhttp3.RequestBody.create; + import java.io.FileInputStream; import java.io.IOException; import java.security.KeyStore; @@ -31,9 +34,9 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; import okhttp3.MediaType; +import okhttp3.MultipartBody; import okhttp3.OkHttpClient; import okhttp3.Request; -import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.ResponseBody; import okhttp3.logging.HttpLoggingInterceptor; @@ -51,6 +54,9 @@ public class C2HttpClient implements C2Client { static final MediaType MEDIA_TYPE_APPLICATION_JSON = MediaType.parse("application/json"); private static final Logger logger = LoggerFactory.getLogger(C2HttpClient.class); + private static final String MULTIPART_FORM_FILE_FIELD_NAME = "file"; + private static final String BUNDLE_FILE_NAME = "debug.tar.gz"; + private static final MediaType BUNDLE_MIME_TYPE = MediaType.parse("application/gzip"); private final AtomicReference httpClientReference = new AtomicReference<>(); private final C2ClientConfig clientConfig; @@ -123,18 +129,41 @@ public class C2HttpClient implements C2Client { @Override public void acknowledgeOperation(C2OperationAck operationAck) { - logger.info("Acknowledging Operation [{}] C2 URL [{}]", operationAck.getOperationId(), clientConfig.getC2AckUrl()); + logger.info("Acknowledging Operation {} to C2 server {}", operationAck.getOperationId(), clientConfig.getC2AckUrl()); serializer.serialize(operationAck) - .map(operationAckBody -> RequestBody.create(operationAckBody, MEDIA_TYPE_APPLICATION_JSON)) + .map(operationAckBody -> create(operationAckBody, MEDIA_TYPE_APPLICATION_JSON)) .map(requestBody -> new Request.Builder().post(requestBody).url(clientConfig.getC2AckUrl()).build()) .map(C2RequestCompression.forType(clientConfig.getC2RequestCompression())::compress) .ifPresent(this::sendAck); } + @Override + public Optional uploadBundle(String callbackUrl, byte[] bundle) { + Request request = new Request.Builder() + .url(callbackUrl) + .post(new MultipartBody.Builder() + .setType(FORM) + .addFormDataPart(MULTIPART_FORM_FILE_FIELD_NAME, BUNDLE_FILE_NAME, create(bundle, BUNDLE_MIME_TYPE)) + .build()) + .build(); + + logger.info("Uploading bundle to C2 server {} with size {}", callbackUrl, bundle.length); + try (Response response = httpClientReference.get().newCall(request).execute()) { + if (!response.isSuccessful()) { + logger.error("Upload bundle failed to C2 server {} with status code {}", callbackUrl, response.code()); + return Optional.of("Upload bundle failed to C2 server"); + } + } catch (IOException e) { + logger.error("Could not upload bundle to C2 server {}", callbackUrl, e); + return Optional.of("Could not upload bundle to C2 server"); + } + return Optional.empty(); + } + private Optional sendHeartbeat(String heartbeat) { Optional c2HeartbeatResponse = Optional.empty(); Request request = new Request.Builder() - .post(RequestBody.create(heartbeat, MEDIA_TYPE_APPLICATION_JSON)) + .post(create(heartbeat, MEDIA_TYPE_APPLICATION_JSON)) .url(clientConfig.getC2Url()) .build(); @@ -143,7 +172,7 @@ public class C2HttpClient implements C2Client { try (Response heartbeatResponse = httpClientReference.get().newCall(decoratedRequest).execute()) { c2HeartbeatResponse = getResponseBody(heartbeatResponse).flatMap(response -> serializer.deserialize(response, C2HeartbeatResponse.class)); } catch (IOException ce) { - logger.error("Send Heartbeat failed [{}]", clientConfig.getC2Url(), ce); + logger.error("Send Heartbeat failed to C2 server {}", clientConfig.getC2Url(), ce); } return c2HeartbeatResponse; @@ -243,10 +272,10 @@ public class C2HttpClient implements C2Client { private void sendAck(Request request) { try (Response heartbeatResponse = httpClientReference.get().newCall(request).execute()) { if (!heartbeatResponse.isSuccessful()) { - logger.warn("Acknowledgement was not successful with c2 server [{}] with status code {}", clientConfig.getC2AckUrl(), heartbeatResponse.code()); + logger.warn("Acknowledgement was not successful with C2 server {} with status code {}", clientConfig.getC2AckUrl(), heartbeatResponse.code()); } } catch (IOException e) { - logger.error("Could not transmit ack to c2 server [{}]", clientConfig.getC2AckUrl(), e); + logger.error("Could not transmit ack to C2 server {}", clientConfig.getC2AckUrl(), e); } } } diff --git a/c2/c2-client-bundle/c2-client-service/pom.xml b/c2/c2-client-bundle/c2-client-service/pom.xml index 8214fef93f..40478797dd 100644 --- a/c2/c2-client-bundle/c2-client-service/pom.xml +++ b/c2/c2-client-bundle/c2-client-service/pom.xml @@ -43,5 +43,10 @@ limitations under the License. c2-client-http 1.18.0-SNAPSHOT + + org.apache.commons + commons-compress + 1.21 + \ No newline at end of file diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandler.java new file mode 100644 index 0000000000..4daa369e69 --- /dev/null +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandler.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.service.operation; + +import static java.nio.file.Files.copy; +import static java.nio.file.Files.createTempDirectory; +import static java.nio.file.Files.lines; +import static java.nio.file.Files.walk; +import static java.util.Optional.empty; +import static java.util.Optional.ofNullable; +import static org.apache.commons.compress.utils.IOUtils.closeQuietly; +import static org.apache.commons.lang3.StringUtils.EMPTY; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED; +import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG; +import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; +import org.apache.nifi.c2.client.api.C2Client; +import org.apache.nifi.c2.protocol.api.C2Operation; +import org.apache.nifi.c2.protocol.api.C2OperationAck; +import org.apache.nifi.c2.protocol.api.C2OperationState; +import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState; +import org.apache.nifi.c2.protocol.api.OperandType; +import org.apache.nifi.c2.protocol.api.OperationType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DebugOperationHandler implements C2OperationHandler { + + private static final Logger LOG = LoggerFactory.getLogger(DebugOperationHandler.class); + + private static final String C2_CALLBACK_URL_NOT_FOUND = "C2 Server callback URL was not found in request"; + private static final String SUCCESSFUL_UPLOAD = "Debug bundle was uploaded successfully"; + private static final String UNABLE_TO_CREATE_BUNDLE = "Unable to create debug bundle"; + + static final String TARGET_ARG = "target"; + static final String NEW_LINE = "\n"; + + private final C2Client c2Client; + private final List bundleFilePaths; + private final Predicate contentFilter; + + private DebugOperationHandler(C2Client c2Client, List bundleFilePaths, Predicate contentFilter) { + this.c2Client = c2Client; + this.bundleFilePaths = bundleFilePaths; + this.contentFilter = contentFilter; + } + + public static DebugOperationHandler create(C2Client c2Client, List bundleFilePaths, Predicate contentFilter) { + if (c2Client == null) { + throw new IllegalArgumentException("C2Client should not be null"); + } + if (bundleFilePaths == null || bundleFilePaths.isEmpty()) { + throw new IllegalArgumentException("bundleFilePaths should not be not null or empty"); + } + if (contentFilter == null) { + throw new IllegalArgumentException("Content filter should not be null"); + } + + return new DebugOperationHandler(c2Client, bundleFilePaths, contentFilter); + } + + @Override + public OperationType getOperationType() { + return TRANSFER; + } + + @Override + public OperandType getOperandType() { + return DEBUG; + } + + @Override + public C2OperationAck handle(C2Operation operation) { + String debugCallbackUrl = operation.getArgs().get(TARGET_ARG); + if (debugCallbackUrl == null) { + LOG.error("Callback URL was not found in C2 request."); + return operationAck(operation, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND)); + } + + List contentFilteredFilePaths = null; + C2OperationState operationState; + try { + contentFilteredFilePaths = filterContent(operation.getIdentifier(), bundleFilePaths); + operationState = createDebugBundle(contentFilteredFilePaths) + .map(bundle -> c2Client.uploadBundle(debugCallbackUrl, bundle) + .map(errorMessage -> operationState(NOT_APPLIED, errorMessage)) + .orElseGet(() -> operationState(FULLY_APPLIED, SUCCESSFUL_UPLOAD))) + .orElseGet(() -> operationState(NOT_APPLIED, UNABLE_TO_CREATE_BUNDLE)); + } catch (Exception e) { + LOG.error("Unexpected error happened", e); + operationState = operationState(NOT_APPLIED, UNABLE_TO_CREATE_BUNDLE); + } finally { + ofNullable(contentFilteredFilePaths).ifPresent(this::cleanup); + } + + LOG.debug("Returning operation ack for operation {} with state {} and details {}", operation.getIdentifier(), operationState.getState(), operationState.getDetails()); + return operationAck(operation, operationState); + } + + private C2OperationAck operationAck(C2Operation operation, C2OperationState state) { + C2OperationAck operationAck = new C2OperationAck(); + operationAck.setOperationId(ofNullable(operation.getIdentifier()).orElse(EMPTY)); + operationAck.setOperationState(state); + return operationAck; + } + + private C2OperationState operationState(OperationState operationState, String details) { + C2OperationState state = new C2OperationState(); + state.setState(operationState); + state.setDetails(details); + return state; + } + + private List filterContent(String operationId, List bundleFilePaths) { + List contentFilteredFilePaths = new ArrayList<>(); + for (Path path : bundleFilePaths) { + String fileName = path.getFileName().toString(); + try (Stream fileStream = lines(path)) { + Path tempDirectory = createTempDirectory(operationId); + Path tempFile = Paths.get(tempDirectory.toAbsolutePath().toString(), fileName); + Files.write(tempFile, (Iterable) fileStream.filter(contentFilter)::iterator); + contentFilteredFilePaths.add(tempFile); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + return contentFilteredFilePaths; + } + + private Optional createDebugBundle(List filePaths) { + ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + try (GzipCompressorOutputStream gzipCompressorOutputStream = new GzipCompressorOutputStream(byteOutputStream); + TarArchiveOutputStream tarOutputStream = new TarArchiveOutputStream(gzipCompressorOutputStream)) { + for (Path filePath : filePaths) { + TarArchiveEntry tarArchiveEntry = new TarArchiveEntry(filePath.toFile(), filePath.getFileName().toString()); + tarOutputStream.putArchiveEntry(tarArchiveEntry); + copy(filePath, tarOutputStream); + tarOutputStream.closeArchiveEntry(); + } + tarOutputStream.finish(); + } catch (Exception e) { + LOG.error("Error during create compressed bundle", e); + return empty(); + } finally { + closeQuietly(byteOutputStream); + } + return Optional.of(byteOutputStream).map(ByteArrayOutputStream::toByteArray); + } + + private void cleanup(List paths) { + paths.stream() + .findFirst() + .map(Path::getParent) + .ifPresent(basePath -> { + try (Stream walk = walk(basePath)) { + walk.map(Path::toFile).forEach(File::delete); + } catch (IOException e) { + LOG.warn("Unable to clean up temporary directory {}", basePath, e); + } + }); + } +} diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandlerTest.java new file mode 100644 index 0000000000..c28baee247 --- /dev/null +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DebugOperationHandlerTest.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.service.operation; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.nio.file.Files.write; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static org.apache.nifi.c2.client.service.operation.DebugOperationHandler.NEW_LINE; +import static org.apache.nifi.c2.client.service.operation.DebugOperationHandler.TARGET_ARG; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED; +import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG; +import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.nifi.c2.client.api.C2Client; +import org.apache.nifi.c2.protocol.api.C2Operation; +import org.apache.nifi.c2.protocol.api.C2OperationAck; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class DebugOperationHandlerTest { + + private static final String OPERATION_ID = "operationId"; + private static final String C2_DEBUG_UPLOAD_ENDPOINT = "https://host/c2/api/upload"; + private static final String DEFAULT_FILE_CONTENT = "some_textual_data"; + private static final List VALID_BUNDLE_FILE_LIST = singletonList(Paths.get("path_to_file")); + private static final Predicate DEFAULT_CONTENT_FILTER = text -> true; + + @Mock + private C2Client c2Client; + + @TempDir + private File tempDir; + + private static Stream invalidConstructorArguments() { + C2Client mockC2Client = mock(C2Client.class); + return Stream.of( + Arguments.of(null, null, null), + Arguments.of(null, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER), + Arguments.of(mockC2Client, null, DEFAULT_CONTENT_FILTER), + Arguments.of(mockC2Client, emptyList(), DEFAULT_CONTENT_FILTER), + Arguments.of(mockC2Client, VALID_BUNDLE_FILE_LIST, null) + ); + } + + @ParameterizedTest(name = "c2Client={0} bundleFileList={1} contentFilter={2}") + @MethodSource("invalidConstructorArguments") + public void testAttemptingCreateWithInvalidParametersWillThrowException(C2Client c2Client, List bundleFilePaths, Predicate contentFilter) { + assertThrows(IllegalArgumentException.class, () -> DebugOperationHandler.create(c2Client, bundleFilePaths, contentFilter)); + } + + @Test + public void testOperationAndOperandTypesAreMatching() { + // given + DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER); + + // when + then + assertEquals(TRANSFER, testHandler.getOperationType()); + assertEquals(DEBUG, testHandler.getOperandType()); + } + + @Test + public void testC2CallbackUrlIsNullInArgs() { + // given + DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, VALID_BUNDLE_FILE_LIST, DEFAULT_CONTENT_FILTER); + C2Operation c2Operation = operation(null); + + // when + C2OperationAck result = testHandler.handle(c2Operation); + + // then + assertEquals(OPERATION_ID, result.getOperationId()); + assertEquals(NOT_APPLIED, result.getOperationState().getState()); + } + + @Test + public void testFilesAreCollectedAndUploadedAsATarGzBundle() { + // given + Map bundleFileNamesWithContents = Stream.of("file.log", "application.conf", "default.properties") + .collect(toMap(identity(), __ -> DEFAULT_FILE_CONTENT)); + List createBundleFiles = bundleFileNamesWithContents.entrySet().stream() + .map(entry -> placeFileWithContent(entry.getKey(), entry.getValue())) + .collect(toList()); + DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, createBundleFiles, DEFAULT_CONTENT_FILTER); + C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT); + + // when + C2OperationAck result = testHandler.handle(c2Operation); + + // then + ArgumentCaptor uploadUrlCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor uploadBundleCaptor = ArgumentCaptor.forClass(byte[].class); + verify(c2Client).uploadBundle(uploadUrlCaptor.capture(), uploadBundleCaptor.capture()); + assertEquals(OPERATION_ID, result.getOperationId()); + assertEquals(FULLY_APPLIED, result.getOperationState().getState()); + assertEquals(C2_DEBUG_UPLOAD_ENDPOINT, uploadUrlCaptor.getValue()); + Map resultBundle = extractBundle(uploadBundleCaptor.getValue()); + assertTrue(mapEqual(bundleFileNamesWithContents, resultBundle)); + } + + @Test + public void testFileToCollectDoesNotExist() { + // given + DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, singletonList(Paths.get(tempDir.getAbsolutePath(), "missing_file")), DEFAULT_CONTENT_FILTER); + C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT); + + // when + C2OperationAck result = testHandler.handle(c2Operation); + + // then + assertEquals(OPERATION_ID, result.getOperationId()); + assertEquals(NOT_APPLIED, result.getOperationState().getState()); + } + + private static Stream contentFilterArguments() { + String filterKeyword = "minifi"; + return Stream.of( + Arguments.of( + "files_containing_keyword_filtered_out.file", + filterKeyword, + Stream.of("line one", "line two " + filterKeyword, filterKeyword + "line three", "line four", "line " + filterKeyword + " five").collect(joining(NEW_LINE)), + Stream.of("line one", "line four").collect(joining(NEW_LINE))), + Arguments.of( + "all_content_filtered_out.file", + filterKeyword, + Stream.of("line one " + filterKeyword, filterKeyword, filterKeyword + "line three", + filterKeyword + "line four" + filterKeyword, "line " + filterKeyword + " five").collect(joining(NEW_LINE)), + ""), + Arguments.of( + "all_content_kept.file", + filterKeyword, + Stream.of("line one", "line two", "line three", "line four", "line five").collect(joining(NEW_LINE)), + Stream.of("line one", "line two", "line three", "line four", "line five").collect(joining(NEW_LINE))) + ); + } + + @ParameterizedTest(name = "file={0}") + @MethodSource("contentFilterArguments") + public void testContentIsFilteredOut(String fileName, String filterKeyword, String inputContent, String expectedContent) { + // given + Path bundleFile = placeFileWithContent(fileName, inputContent); + Predicate testContentFilter = content -> !content.contains(filterKeyword); + DebugOperationHandler testHandler = DebugOperationHandler.create(c2Client, singletonList(bundleFile), testContentFilter); + C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT); + + // when + C2OperationAck result = testHandler.handle(c2Operation); + + // then + ArgumentCaptor uploadUrlCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor uploadBundleCaptor = ArgumentCaptor.forClass(byte[].class); + verify(c2Client).uploadBundle(uploadUrlCaptor.capture(), uploadBundleCaptor.capture()); + assertEquals(OPERATION_ID, result.getOperationId()); + assertEquals(FULLY_APPLIED, result.getOperationState().getState()); + assertEquals(C2_DEBUG_UPLOAD_ENDPOINT, uploadUrlCaptor.getValue()); + Map resultBundle = extractBundle(uploadBundleCaptor.getValue()); + assertEquals(1, resultBundle.size()); + assertEquals(expectedContent, resultBundle.get(fileName)); + } + + private Path placeFileWithContent(String fileName, String content) { + Path filePath = Paths.get(tempDir.getAbsolutePath(), fileName); + try { + write(filePath, content.getBytes(UTF_8)); + } catch (IOException e) { + throw new UncheckedIOException("Failed to write file to temp directory", e); + } + return filePath; + } + + private C2Operation operation(String uploadUrl) { + C2Operation c2Operation = new C2Operation(); + c2Operation.setIdentifier(OPERATION_ID); + c2Operation.setArgs(singletonMap(TARGET_ARG, uploadUrl)); + return c2Operation; + } + + private Map extractBundle(byte[] bundle) { + Map fileNamesWithContents = new HashMap<>(); + try (TarArchiveInputStream tarInputStream = new TarArchiveInputStream(new GzipCompressorInputStream(new ByteArrayInputStream(bundle)))) { + TarArchiveEntry currentEntry; + while ((currentEntry = tarInputStream.getNextTarEntry()) != null) { + fileNamesWithContents.put( + currentEntry.getName(), + new BufferedReader(new InputStreamReader(tarInputStream)).lines().collect(joining(NEW_LINE))); + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to extract bundle", e); + } + return fileNamesWithContents; + } + + private boolean mapEqual(Map first, Map second) { + if (first.size() != second.size()) { + return false; + } + return first.entrySet() + .stream() + .allMatch(e -> e.getValue().equals(second.get(e.getKey()))); + } +} diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiExecCommandProvider.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiExecCommandProvider.java index dcf8641002..7c52a99c68 100644 --- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiExecCommandProvider.java +++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiExecCommandProvider.java @@ -31,8 +31,19 @@ import org.apache.nifi.bootstrap.util.OSUtils; public class MiNiFiExecCommandProvider { + public static final String LOG_DIR = "org.apache.nifi.minifi.bootstrap.config.log.dir"; + public static final String DEFAULT_LOG_DIR = "./logs"; + + public static final String APP_LOG_FILE_NAME = "org.apache.nifi.minifi.bootstrap.config.log.app.file.name"; + public static final String APP_LOG_FILE_EXTENSION = "org.apache.nifi.minifi.bootstrap.config.log.app.file.extension"; + public static final String BOOTSTRAP_LOG_FILE_NAME = "org.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.name"; + public static final String BOOTSTRAP_LOG_FILE_EXTENSION = "org.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.extension"; + public static final String DEFAULT_APP_LOG_FILE_NAME = "minifi-app"; + public static final String DEFAULT_BOOTSTRAP_LOG_FILE_NAME = "minifi-bootstrap"; + public static final String DEFAULT_LOG_FILE_EXTENSION = "log"; + + private static final String DEFAULT_JAVA_CMD = "java"; - private static final String DEFAULT_LOG_DIR = "./logs"; private static final String DEFAULT_LIB_DIR = "./lib"; private static final String DEFAULT_CONF_DIR = "./conf"; private static final String DEFAULT_CONFIG_FILE = DEFAULT_CONF_DIR + "/bootstrap.conf"; @@ -56,7 +67,11 @@ public class MiNiFiExecCommandProvider { Properties props = bootstrapFileProvider.getBootstrapProperties(); File confDir = getFile(props.getProperty(CONF_DIR_KEY, DEFAULT_CONF_DIR).trim(), workingDir); File libDir = getFile(props.getProperty("lib.dir", DEFAULT_LIB_DIR).trim(), workingDir); - String minifiLogDir = System.getProperty("org.apache.nifi.minifi.bootstrap.config.log.dir", DEFAULT_LOG_DIR).trim(); + String minifiLogDir = System.getProperty(LOG_DIR, DEFAULT_LOG_DIR).trim(); + String minifiAppLogFileName = System.getProperty(APP_LOG_FILE_NAME, DEFAULT_APP_LOG_FILE_NAME).trim(); + String minifiAppLogFileExtension = System.getProperty(APP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim(); + String minifiBootstrapLogFileName = System.getProperty(BOOTSTRAP_LOG_FILE_NAME, DEFAULT_BOOTSTRAP_LOG_FILE_NAME).trim(); + String minifiBootstrapLogFileExtension = System.getProperty(BOOTSTRAP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim(); List cmd = new ArrayList<>(); cmd.add(getJavaCommand(props)); @@ -66,7 +81,11 @@ public class MiNiFiExecCommandProvider { cmd.add("-Dnifi.properties.file.path=" + getMiNiFiPropsFileName(props, confDir)); cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort); cmd.add("-Dapp=MiNiFi"); - cmd.add("-Dorg.apache.nifi.minifi.bootstrap.config.log.dir=" + minifiLogDir); + cmd.add("-D" + LOG_DIR + "=" + minifiLogDir); + cmd.add("-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName); + cmd.add("-D" + APP_LOG_FILE_EXTENSION + "=" + minifiAppLogFileExtension); + cmd.add("-D" + BOOTSTRAP_LOG_FILE_NAME + "=" + minifiBootstrapLogFileName); + cmd.add("-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" + minifiBootstrapLogFileExtension); cmd.add("org.apache.nifi.minifi.MiNiFi"); return cmd; diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java index a9410039e4..2251e6a84a 100644 --- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java +++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java @@ -19,6 +19,15 @@ package org.apache.nifi.minifi.bootstrap.util; import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.OVERRIDE_SECURITY; import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY; +import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.APP_LOG_FILE_EXTENSION; +import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.APP_LOG_FILE_NAME; +import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.BOOTSTRAP_LOG_FILE_EXTENSION; +import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.BOOTSTRAP_LOG_FILE_NAME; +import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.DEFAULT_APP_LOG_FILE_NAME; +import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.DEFAULT_BOOTSTRAP_LOG_FILE_NAME; +import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.DEFAULT_LOG_DIR; +import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.DEFAULT_LOG_FILE_EXTENSION; +import static org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider.LOG_DIR; import java.io.ByteArrayInputStream; import java.io.FileOutputStream; @@ -47,8 +56,10 @@ import javax.xml.transform.stream.StreamResult; import org.apache.commons.io.input.TeeInputStream; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.minifi.bootstrap.RunMiNiFi; import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException; import org.apache.nifi.minifi.bootstrap.exception.InvalidConfigurationException; +import org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider; import org.apache.nifi.minifi.commons.schema.ComponentStatusRepositorySchema; import org.apache.nifi.minifi.commons.schema.ConfigSchema; import org.apache.nifi.minifi.commons.schema.ConnectionSchema; @@ -84,13 +95,20 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; public final class ConfigTransformer { + // Underlying version of NIFI will be using + public static final String ROOT_GROUP = "Root-Group"; + + static final String MINIFI_CONFIG_FILE_PATH = "nifi.minifi.config.file"; + static final String MINIFI_BOOTSTRAP_FILE_PATH = "nifi.minifi.bootstrap.file"; + static final String MINIFI_LOG_DIRECTORY = "nifi.minifi.log.directory"; + static final String MINIFI_APP_LOG_FILE = "nifi.minifi.app.log.file"; + static final String MINIFI_BOOTSTRAP_LOG_FILE = "nifi.minifi.bootstrap.log.file"; + + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigTransformer.class); + private static final String OVERRIDE_CORE_PROPERTIES_KEY = PULL_HTTP_BASE_KEY + ".override.core"; private static final Base64.Encoder KEY_ENCODER = Base64.getEncoder().withoutPadding(); private static final int SENSITIVE_PROPERTIES_KEY_LENGTH = 24; - private static final Logger LOGGER = LoggerFactory.getLogger(ConfigTransformer.class); - - // Underlying version of NIFI will be using - public static final String ROOT_GROUP = "Root-Group"; // Final util classes should have private constructor private ConfigTransformer() { @@ -147,6 +165,17 @@ public final class ConfigTransformer { .filter(entry -> ((String) entry.getKey()).startsWith("c2")) .forEach(entry -> configSchemaNew.getNifiPropertiesOverrides().putIfAbsent((String) entry.getKey(), (String) entry.getValue())); + // Config files and log files + if (bootstrapProperties != null) { + configSchemaNew.getNifiPropertiesOverrides().putIfAbsent(MINIFI_CONFIG_FILE_PATH, bootstrapProperties.getProperty(RunMiNiFi.MINIFI_CONFIG_FILE_KEY)); + } + configSchemaNew.getNifiPropertiesOverrides().putIfAbsent(MINIFI_BOOTSTRAP_FILE_PATH, BootstrapFileProvider.getBootstrapConfFile().getAbsolutePath()); + configSchemaNew.getNifiPropertiesOverrides().putIfAbsent(MINIFI_LOG_DIRECTORY, System.getProperty(LOG_DIR, DEFAULT_LOG_DIR).trim()); + configSchemaNew.getNifiPropertiesOverrides().putIfAbsent(MINIFI_APP_LOG_FILE, + System.getProperty(APP_LOG_FILE_NAME, DEFAULT_APP_LOG_FILE_NAME).trim() + "." + System.getProperty(APP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim()); + configSchemaNew.getNifiPropertiesOverrides().putIfAbsent(MINIFI_BOOTSTRAP_LOG_FILE, + System.getProperty(BOOTSTRAP_LOG_FILE_NAME, DEFAULT_BOOTSTRAP_LOG_FILE_NAME).trim() + "." + System.getProperty(BOOTSTRAP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim()); + // Create nifi.properties and flow.xml.gz in memory ByteArrayOutputStream nifiPropertiesOutputStream = new ByteArrayOutputStream(); writeNiFiProperties(configSchemaNew, nifiPropertiesOutputStream); diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java index 6c9e2b97a7..d25c98d6a1 100644 --- a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java +++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java @@ -65,6 +65,11 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; +import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.MINIFI_APP_LOG_FILE; +import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.MINIFI_BOOTSTRAP_FILE_PATH; +import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.MINIFI_BOOTSTRAP_LOG_FILE; +import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.MINIFI_CONFIG_FILE_PATH; +import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.MINIFI_LOG_DIRECTORY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -491,6 +496,28 @@ public class ConfigTransformerTest { } + @Test + public void checkLogAndConfigFilePathOverrides() throws Exception { + File inputFile = new File("./src/test/resources/config-minimal.yml"); + Properties bootstrapProperties = getTestBootstrapProperties("bootstrap.conf"); + ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/", bootstrapProperties); + + File nifiPropertiesFile = new File("./target/nifi.properties"); + + assertTrue(nifiPropertiesFile.exists()); + assertTrue(nifiPropertiesFile.canRead()); + + Properties nifiProperties = new Properties(); + nifiProperties.load(new FileInputStream(nifiPropertiesFile)); + + // an absolute path is returned for bootstrap which is different per every environment. only testing that path ends with expected relative path + assertTrue(nifiProperties.getProperty(MINIFI_BOOTSTRAP_FILE_PATH).endsWith("bootstrap.conf")); + assertEquals("./conf/config.yml", nifiProperties.getProperty(MINIFI_CONFIG_FILE_PATH)); + assertEquals("./logs", nifiProperties.getProperty(MINIFI_LOG_DIRECTORY)); + assertEquals("minifi-app.log", nifiProperties.getProperty(MINIFI_APP_LOG_FILE)); + assertEquals("minifi-bootstrap.log", nifiProperties.getProperty(MINIFI_BOOTSTRAP_LOG_FILE)); + } + public void testConfigFileTransform(String configFile) throws Exception { ConfigSchema configSchema = SchemaLoader.loadConfigSchemaFromYaml(ConfigTransformerTest.class.getClassLoader().getResourceAsStream(configFile)); diff --git a/minifi/minifi-bootstrap/src/test/resources/bootstrap.conf b/minifi/minifi-bootstrap/src/test/resources/bootstrap.conf new file mode 100644 index 0000000000..b4aae7f6d2 --- /dev/null +++ b/minifi/minifi-bootstrap/src/test/resources/bootstrap.conf @@ -0,0 +1,131 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Java command to use when running MiNiFi +java=java + +# Username to use when running MiNiFi. This value will be ignored on Windows. +run.as= + +# Configure where MiNiFi's lib and conf directories live +# When running as a Windows service set full paths instead of relative paths +lib.dir=./lib +conf.dir=./conf + +# How long to wait after telling MiNiFi to shutdown before explicitly killing the Process +graceful.shutdown.seconds=20 + +# The location for the configuration file +# When running as a Windows service use the full path to the file +nifi.minifi.config=./conf/config.yml + +# Security Properties # +# These properties take precedence over any equivalent properties specified in config.yml # +nifi.minifi.security.keystore= +nifi.minifi.security.keystoreType=jks +nifi.minifi.security.keystorePasswd= +nifi.minifi.security.keyPasswd= +nifi.minifi.security.truststore= +nifi.minifi.security.truststoreType= +nifi.minifi.security.truststorePasswd= +nifi.minifi.security.ssl.protocol= + +nifi.minifi.sensitive.props.key= +nifi.minifi.sensitive.props.algorithm= + +# Provenance Reporting Properties # +# These properties take precedence over any equivalent properties specified in the config.yml # +nifi.minifi.provenance.reporting.comment= +nifi.minifi.provenance.reporting.scheduling.strategy= +nifi.minifi.provenance.reporting.scheduling.period= +nifi.minifi.provenance.reporting.destination.url= +nifi.minifi.provenance.reporting.input.port.name= +nifi.minifi.provenance.reporting.instance.url= +nifi.minifi.provenance.reporting.batch.size= +nifi.minifi.provenance.reporting.communications.timeout= + +# Ignore custom SSL controller services and use parent minifi SSL +nifi.minifi.flow.use.parent.ssl=true + +# Notifiers to use for the associated agent, comma separated list of class names +#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.FileChangeIngestor +#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.RestChangeIngestor +#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor + +# File change notifier configuration + +# Path of the file to monitor for changes. When these occur, the FileChangeNotifier, if configured, will begin the configuration reloading process +#nifi.minifi.notifier.ingestors.file.config.path= +# How frequently the file specified by 'nifi.minifi.notifier.file.config.path' should be evaluated for changes. +#nifi.minifi.notifier.ingestors.file.polling.period.seconds=5 + +# Rest change notifier configuration + +# Port on which the Jetty server will bind to, keep commented for a random open port +#nifi.minifi.notifier.ingestors.receive.http.port=8338 + +#Pull HTTP change notifier configuration + +# Hostname on which to pull configurations from +#nifi.minifi.notifier.ingestors.pull.http.hostname=localhost +# Port on which to pull configurations from +#nifi.minifi.notifier.ingestors.pull.http.port=4567 +# Path to pull configurations from +#nifi.minifi.notifier.ingestors.pull.http.path=/c2/config +# Query string to pull configurations with +#nifi.minifi.notifier.ingestors.pull.http.query=class=raspi3 +# Period on which to pull configurations from, defaults to 5 minutes if commented out +#nifi.minifi.notifier.ingestors.pull.http.period.ms=300000 + +# Periodic Status Reporters to use for the associated agent, comma separated list of class names +#nifi.minifi.status.reporter.components=org.apache.nifi.minifi.bootstrap.status.reporters.StatusLogger + +# Periodic Status Logger configuration + +# The FlowStatus query to submit to the MiNiFi instance +#nifi.minifi.status.reporter.log.query=instance:health,bulletins +# The log level at which the status will be logged +#nifi.minifi.status.reporter.log.level=INFO +# The period (in milliseconds) at which to log the status +#nifi.minifi.status.reporter.log.period=60000 + +# Disable JSR 199 so that we can use JSP's without running a JDK +java.arg.1=-Dorg.apache.jasper.compiler.disablejsr199=true + +# JVM memory settings +java.arg.2=-Xms256m +java.arg.3=-Xmx256m + +# Enable Remote Debugging +#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000 + +java.arg.4=-Djava.net.preferIPv4Stack=true + +# allowRestrictedHeaders is required for Cluster/Node communications to work properly +java.arg.5=-Dsun.net.http.allowRestrictedHeaders=true +java.arg.6=-Djava.protocol.handler.pkgs=sun.net.www.protocol + +# Sets the provider of SecureRandom to /dev/urandom to prevent blocking on VMs +java.arg.7=-Djava.security.egd=file:/dev/urandom + + +# The G1GC is still considered experimental but has proven to be very advantageous in providing great +# performance without significant "stop-the-world" delays. +#java.arg.13=-XX:+UseG1GC + +#Set headless mode by default +java.arg.14=-Djava.awt.headless=true \ No newline at end of file diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/minifi-env.bat b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/minifi-env.bat index 8d1c98566d..dfe4202e3b 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/minifi-env.bat +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/minifi-env.bat @@ -26,4 +26,8 @@ rem The directory for the NiFi pid file set MINIFI_PID_DIR=%MINIFI_ROOT%\run rem The directory for NiFi log files -set MINIFI_LOG_DIR=%MINIFI_ROOT%\logs \ No newline at end of file +set MINIFI_LOG_DIR=%MINIFI_ROOT%\logs +set MINIFI_APP_LOG_FILE_NAME="minifi-app" +set MINIFI_APP_LOG_FILE_EXTENSION="log" +set MINIFI_BOOTSTRAP_LOG_FILE_NAME="minifi-bootstrap" +set MINIFI_BOOTSTRAP_LOG_FILE_EXTENSION="log" \ No newline at end of file diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/minifi-env.sh b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/minifi-env.sh index a9fa1d6762..bb16e3e081 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/minifi-env.sh +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/minifi-env.sh @@ -21,8 +21,14 @@ export MINIFI_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd) -#The directory for the NiFi pid file +#The directory for the MiNiFi pid file export MINIFI_PID_DIR="${MINIFI_HOME}/run" -#The directory for NiFi log files -export MINIFI_LOG_DIR="${MINIFI_HOME}/logs" \ No newline at end of file +#The directory for MiNiFi log files +export MINIFI_LOG_DIR="${MINIFI_HOME}/logs" + +# MiNiFi log file names and extensions +export MINIFI_APP_LOG_FILE_NAME=minifi-app +export MINIFI_APP_LOG_FILE_EXTENSION=log +export MINIFI_BOOTSTRAP_LOG_FILE_NAME=minifi-bootstrap +export MINIFI_BOOTSTRAP_LOG_FILE_EXTENSION=log \ No newline at end of file diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/minifi.sh b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/minifi.sh index 8074dfe93b..5c33b7b2ff 100755 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/minifi.sh +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/minifi.sh @@ -303,7 +303,7 @@ run() { #setup directory parameters - BOOTSTRAP_LOG_PARAMS="-Dorg.apache.nifi.minifi.bootstrap.config.log.dir="\""${MINIFI_LOG_DIR}"\""" + BOOTSTRAP_LOG_PARAMS="-Dorg.apache.nifi.minifi.bootstrap.config.log.dir="\""${MINIFI_LOG_DIR}"\"" -Dorg.apache.nifi.minifi.bootstrap.config.log.app.file.name="\""${MINIFI_APP_LOG_FILE_NAME}"\"" -Dorg.apache.nifi.minifi.bootstrap.config.log.app.file.extension="\""${MINIFI_APP_LOG_FILE_EXTENSION}"\"" -Dorg.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.name="\""${MINIFI_BOOTSTRAP_LOG_FILE_NAME}"\"" -Dorg.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.extension="\""${MINIFI_BOOTSTRAP_LOG_FILE_EXTENSION}"\""" BOOTSTRAP_PID_PARAMS="-Dorg.apache.nifi.minifi.bootstrap.config.pid.dir="\""${MINIFI_PID_DIR}"\""" BOOTSTRAP_CONF_PARAMS="-Dorg.apache.nifi.minifi.bootstrap.config.file="\""${BOOTSTRAP_CONF}"\""" diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/run-minifi.bat b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/run-minifi.bat index c26c940e23..dc7a3890ce 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/run-minifi.bat +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/run-minifi.bat @@ -42,7 +42,7 @@ set LIB_DIR=lib set CONF_DIR=conf set BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf -set JAVA_ARGS=-Dorg.apache.nifi.minifi.bootstrap.config.log.dir=%MINIFI_LOG_DIR% -Dorg.apache.nifi.minifi.bootstrap.config.pid.dir=%MINIFI_PID_DIR% -Dorg.apache.nifi.minifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% +set JAVA_ARGS=-Dorg.apache.nifi.minifi.bootstrap.config.log.dir=%MINIFI_LOG_DIR% -Dorg.apache.nifi.minifi.bootstrap.config.log.app.file.name=%MINIFI_APP_LOG_FILE_NAME% -Dorg.apache.nifi.minifi.bootstrap.config.log.app.file.extension=%MINIFI_APP_LOG_FILE_EXTENSION% -Dorg.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.name=%MINIFI_BOOTSTRAP_LOG_FILE_NAME% -Dorg.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.extension=%MINIFI_BOOTSTRAP_LOG_FILE_EXTENSION% -Dorg.apache.nifi.minifi.bootstrap.config.pid.dir=%MINIFI_PID_DIR% -Dorg.apache.nifi.minifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% set JAVA_PARAMS=-cp %CONF_DIR%;%BOOTSTRAP_LIB_DIR%\*;%LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.minifi.bootstrap.RunMiNiFi set BOOTSTRAP_ACTION=run diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/logback.xml b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/logback.xml index bbb4831508..2b470d7180 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/logback.xml +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/logback.xml @@ -20,7 +20,7 @@ - ${org.apache.nifi.minifi.bootstrap.config.log.dir}/minifi-app.log + ${org.apache.nifi.minifi.bootstrap.config.log.dir}/${org.apache.nifi.minifi.bootstrap.config.log.app.file.name}.${org.apache.nifi.minifi.bootstrap.config.log.app.file.extension} - ${org.apache.nifi.minifi.bootstrap.config.log.dir}/minifi-app_%d{yyyy-MM-dd_HH}.%i.log.gz + ${org.apache.nifi.minifi.bootstrap.config.log.dir}/${org.apache.nifi.minifi.bootstrap.config.log.app.file.name}_%d{yyyy-MM-dd_HH}.%i.${org.apache.nifi.minifi.bootstrap.config.log.app.file.extension}.gz 10 @@ -43,7 +43,7 @@ - ${org.apache.nifi.minifi.bootstrap.config.log.dir}/minifi-bootstrap.log + ${org.apache.nifi.minifi.bootstrap.config.log.dir}/${org.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.name}.${org.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.extension} - ${org.apache.nifi.minifi.bootstrap.config.log.dir}/minifi-bootstrap_%d.log.gz + ${org.apache.nifi.minifi.bootstrap.config.log.dir}/${org.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.name}_%d.${org.apache.nifi.minifi.bootstrap.config.log.bootstrap.file.extension}.gz 5 diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java index 35d591b192..41db531ef8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java @@ -14,15 +14,32 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.nifi.c2; +import static java.util.Optional.ofNullable; +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; + import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Arrays; -import java.util.Optional; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; import org.apache.nifi.c2.client.C2ClientConfig; import org.apache.nifi.c2.client.http.C2HttpClient; import org.apache.nifi.c2.client.service.C2ClientService; @@ -30,13 +47,14 @@ import org.apache.nifi.c2.client.service.C2HeartbeatFactory; import org.apache.nifi.c2.client.service.FlowIdHolder; import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper; import org.apache.nifi.c2.client.service.operation.C2OperationService; +import org.apache.nifi.c2.client.service.operation.DebugOperationHandler; import org.apache.nifi.c2.client.service.operation.DescribeManifestOperationHandler; import org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler; import org.apache.nifi.c2.protocol.api.AgentRepositories; import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus; import org.apache.nifi.c2.protocol.api.FlowQueueStatus; import org.apache.nifi.c2.serializer.C2JacksonSerializer; -import org.apache.nifi.controller.FlowController;; +import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.diagnostics.StorageUsage; @@ -51,16 +69,26 @@ import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - public class C2NifiClientService { + public static final Set SENSITIVE_PROPERTY_KEYWORDS = + Stream.of("key:", "algorithm:", "secret.key", "sensitive.props.key", "sensitive.props.algorithm", "secret", "password", "passwd") + .map(String::toLowerCase) + .collect(collectingAndThen(toSet(), Collections::unmodifiableSet)); + public static final Predicate EXCLUDE_SENSITIVE_TEXT = text -> + ofNullable(text) + .map(String::toLowerCase) + .map(t -> SENSITIVE_PROPERTY_KEYWORDS.stream().noneMatch(keyword -> t.contains(keyword))) + .orElse(true); + + private static final String MINIFI_CONFIG_FILE_PATH = "nifi.minifi.config.file"; + private static final String MINIFI_BOOTSTRAP_FILE_PATH = "nifi.minifi.bootstrap.file"; + private static final String MINIFI_LOG_DIRECTORY = "nifi.minifi.log.directory"; + private static final String MINIFI_APP_LOG_FILE = "nifi.minifi.app.log.file"; + private static final String MINIFI_BOOTSTRAP_LOG_FILE = "nifi.minifi.bootstrap.log.file"; + private static final Logger logger = LoggerFactory.getLogger(C2NifiClientService.class); + private static final String DEFAULT_CONF_DIR = "./conf"; private static final String TARGET_CONFIG_FILE = "/config-new.yml"; private static final String ROOT_GROUP_ID = "root"; @@ -96,37 +124,38 @@ public class C2NifiClientService { heartbeatFactory, new C2OperationService(Arrays.asList( new UpdateConfigurationOperationHandler(client, flowIdHolder, this::updateFlowContent), - new DescribeManifestOperationHandler(heartbeatFactory, this::generateRuntimeInfo) + new DescribeManifestOperationHandler(heartbeatFactory, this::generateRuntimeInfo), + DebugOperationHandler.create(client, debugBundleFiles(niFiProperties), EXCLUDE_SENSITIVE_TEXT) )) ); } private C2ClientConfig generateClientConfig(NiFiProperties properties) { return new C2ClientConfig.Builder() - .agentClass(properties.getProperty(C2NiFiProperties.C2_AGENT_CLASS_KEY, "")) - .agentIdentifier(properties.getProperty(C2NiFiProperties.C2_AGENT_IDENTIFIER_KEY)) - .fullHeartbeat(Boolean.parseBoolean(properties.getProperty(C2NiFiProperties.C2_FULL_HEARTBEAT_KEY, "true"))) - .heartbeatPeriod(Long.parseLong(properties.getProperty(C2NiFiProperties.C2_AGENT_HEARTBEAT_PERIOD_KEY, - String.valueOf(C2NiFiProperties.C2_AGENT_DEFAULT_HEARTBEAT_PERIOD)))) - .connectTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_CONNECTION_TIMEOUT, - C2NiFiProperties.C2_DEFAULT_CONNECTION_TIMEOUT), TimeUnit.MILLISECONDS)) - .readTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_READ_TIMEOUT, - C2NiFiProperties.C2_DEFAULT_READ_TIMEOUT), TimeUnit.MILLISECONDS)) - .callTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_CALL_TIMEOUT, - C2NiFiProperties.C2_DEFAULT_CALL_TIMEOUT), TimeUnit.MILLISECONDS)) - .c2Url(properties.getProperty(C2NiFiProperties.C2_REST_URL_KEY, "")) - .c2RequestCompression(properties.getProperty(C2NiFiProperties.C2_REQUEST_COMPRESSION_KEY, C2NiFiProperties.C2_REQUEST_COMPRESSION)) + .agentClass(properties.getProperty(C2NiFiProperties.C2_AGENT_CLASS_KEY, "")) + .agentIdentifier(properties.getProperty(C2NiFiProperties.C2_AGENT_IDENTIFIER_KEY)) + .fullHeartbeat(Boolean.parseBoolean(properties.getProperty(C2NiFiProperties.C2_FULL_HEARTBEAT_KEY, "true"))) + .heartbeatPeriod(Long.parseLong(properties.getProperty(C2NiFiProperties.C2_AGENT_HEARTBEAT_PERIOD_KEY, + String.valueOf(C2NiFiProperties.C2_AGENT_DEFAULT_HEARTBEAT_PERIOD)))) + .connectTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_CONNECTION_TIMEOUT, + C2NiFiProperties.C2_DEFAULT_CONNECTION_TIMEOUT), TimeUnit.MILLISECONDS)) + .readTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_READ_TIMEOUT, + C2NiFiProperties.C2_DEFAULT_READ_TIMEOUT), TimeUnit.MILLISECONDS)) + .callTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_CALL_TIMEOUT, + C2NiFiProperties.C2_DEFAULT_CALL_TIMEOUT), TimeUnit.MILLISECONDS)) + .c2Url(properties.getProperty(C2NiFiProperties.C2_REST_URL_KEY, "")) + .c2RequestCompression(properties.getProperty(C2NiFiProperties.C2_REQUEST_COMPRESSION_KEY, C2NiFiProperties.C2_REQUEST_COMPRESSION)) .confDirectory(properties.getProperty(C2NiFiProperties.C2_CONFIG_DIRECTORY_KEY, DEFAULT_CONF_DIR)) - .runtimeManifestIdentifier(properties.getProperty(C2NiFiProperties.C2_RUNTIME_MANIFEST_IDENTIFIER_KEY, "")) - .runtimeType(properties.getProperty(C2NiFiProperties.C2_RUNTIME_TYPE_KEY, "")) - .c2AckUrl(properties.getProperty(C2NiFiProperties.C2_REST_URL_ACK_KEY, "")) - .truststoreFilename(properties.getProperty(C2NiFiProperties.TRUSTSTORE_LOCATION_KEY, "")) - .truststorePassword(properties.getProperty(C2NiFiProperties.TRUSTSTORE_PASSWORD_KEY, "")) - .truststoreType(properties.getProperty(C2NiFiProperties.TRUSTSTORE_TYPE_KEY, "JKS")) - .keystoreFilename(properties.getProperty(C2NiFiProperties.KEYSTORE_LOCATION_KEY, "")) - .keystorePassword(properties.getProperty(C2NiFiProperties.KEYSTORE_PASSWORD_KEY, "")) - .keystoreType(properties.getProperty(C2NiFiProperties.KEYSTORE_TYPE_KEY, "JKS")) - .build(); + .runtimeManifestIdentifier(properties.getProperty(C2NiFiProperties.C2_RUNTIME_MANIFEST_IDENTIFIER_KEY, "")) + .runtimeType(properties.getProperty(C2NiFiProperties.C2_RUNTIME_TYPE_KEY, "")) + .c2AckUrl(properties.getProperty(C2NiFiProperties.C2_REST_URL_ACK_KEY, "")) + .truststoreFilename(properties.getProperty(C2NiFiProperties.TRUSTSTORE_LOCATION_KEY, "")) + .truststorePassword(properties.getProperty(C2NiFiProperties.TRUSTSTORE_PASSWORD_KEY, "")) + .truststoreType(properties.getProperty(C2NiFiProperties.TRUSTSTORE_TYPE_KEY, "JKS")) + .keystoreFilename(properties.getProperty(C2NiFiProperties.KEYSTORE_LOCATION_KEY, "")) + .keystorePassword(properties.getProperty(C2NiFiProperties.KEYSTORE_PASSWORD_KEY, "")) + .keystoreType(properties.getProperty(C2NiFiProperties.KEYSTORE_TYPE_KEY, "JKS")) + .build(); } public void start() { @@ -209,10 +238,21 @@ public class C2NifiClientService { } private File getTargetConfigFile() { - return Optional.ofNullable(propertiesDir) + return ofNullable(propertiesDir) .map(File::new) .map(File::getParent) .map(parentDir -> new File(parentDir + TARGET_CONFIG_FILE)) - .orElse( new File(DEFAULT_CONF_DIR + TARGET_CONFIG_FILE)); + .orElse(new File(DEFAULT_CONF_DIR + TARGET_CONFIG_FILE)); + } + + private List debugBundleFiles(NiFiProperties properties) { + return Stream.of( + Paths.get(properties.getProperty(MINIFI_CONFIG_FILE_PATH)), + Paths.get(properties.getProperty(MINIFI_BOOTSTRAP_FILE_PATH)), + Paths.get(properties.getProperty(MINIFI_LOG_DIRECTORY), properties.getProperty(MINIFI_APP_LOG_FILE)), + Paths.get(properties.getProperty(MINIFI_LOG_DIRECTORY), properties.getProperty(MINIFI_BOOTSTRAP_LOG_FILE))) + .filter(Files::exists) + .filter(Files::isRegularFile) + .collect(toList()); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/C2NifiClientServiceTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/C2NifiClientServiceTest.java new file mode 100644 index 0000000000..8d58ea5b22 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/c2/C2NifiClientServiceTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.c2; + +import static org.apache.nifi.c2.C2NifiClientService.EXCLUDE_SENSITIVE_TEXT; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.stream.Stream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class C2NifiClientServiceTest { + + private static Stream textAndExpectedResult() { + return Stream.of( + Arguments.of("nifi.minifi.sensitive.props.key", false), + Arguments.of("nifi.minifi.sensitive.props.algorithm", false), + Arguments.of("key:", false), + Arguments.of("algorithm:", false), + Arguments.of("secret.key", false), + Arguments.of("c2.security.truststore.password", false), + Arguments.of("c2.security.keystore.password", false), + Arguments.of("nifi.minifi.security.keystorePasswd", false), + Arguments.of("nifi.minifi.security.truststorePasswd", false), + Arguments.of("nifi.minifi.security.keystore", true), + Arguments.of("nifi.minifi.security.truststore", true), + Arguments.of("nifi.minifi.flow.use.parent.ssl", true), + Arguments.of("nifi.minifi.status.reporter", true), + Arguments.of("nifi.minifi.security.ssl.protocol", true), + Arguments.of("", true), + Arguments.of(null, true), + Arguments.of("nifi", true) + ); + } + + @ParameterizedTest + @MethodSource("textAndExpectedResult") + public void testSensitiveTextIsExcluded(String propertyName, boolean expectedResult) { + assertEquals(expectedResult, EXCLUDE_SENSITIVE_TEXT.test(propertyName)); + } +}