diff --git a/minifi/minifi-commons/minifi-commons-utils/src/main/java/org/apache/nifi/minifi/commons/utils/RetryUtil.java b/minifi/minifi-commons/minifi-commons-utils/src/main/java/org/apache/nifi/minifi/commons/utils/RetryUtil.java new file mode 100644 index 0000000000..74411aad6e --- /dev/null +++ b/minifi/minifi-commons/minifi-commons-utils/src/main/java/org/apache/nifi/minifi/commons/utils/RetryUtil.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.commons.utils; + +import java.util.Optional; +import java.util.function.Predicate; +import java.util.function.Supplier; + +public final class RetryUtil { + + private RetryUtil() { + throw new UnsupportedOperationException(); + } + + public static Optional retry(Supplier input, Predicate predicate, int maxRetries, int pauseDurationMillis) { + int retries = 0; + while (true) { + T t = input.get(); + if (predicate.test(t)) { + return Optional.empty(); + } + if (retries == maxRetries) { + return Optional.ofNullable(t); + } + retries++; + try { + Thread.sleep(pauseDurationMillis); + } catch (InterruptedException e) { + } + } + } +} diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml index 37ed6dd95f..addc049ebf 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml @@ -145,6 +145,11 @@ limitations under the License. nifi-framework-core 2.0.0-SNAPSHOT + + org.apache.nifi.minifi + minifi-commons-utils + 2.0.0-SNAPSHOT + org.apache.nifi nifi-python-framework-api diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java index d5ccac1e53..47504750be 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java @@ -17,15 +17,35 @@ package org.apache.nifi.minifi.c2.command; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Collections.emptySet; +import static java.util.UUID.randomUUID; +import static java.util.function.Predicate.not; +import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION; +import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION; +import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.backup; +import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.persist; +import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.removeIfExists; +import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.revert; +import static org.apache.nifi.minifi.commons.utils.RetryUtil.retry; +import static org.apache.nifi.minifi.validator.FlowValidator.validate; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.io.FilenameUtils; import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy; -import org.apache.nifi.components.AsyncLoadedProcessor; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.connectable.Connection; -import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessorNode; -import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.flow.VersionedDataflow; import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedProcessGroup; @@ -38,49 +58,12 @@ import org.apache.nifi.services.FlowService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.file.Path; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.Collections.emptySet; -import static java.util.UUID.randomUUID; -import static java.util.function.Predicate.not; -import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES; -import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.INITIALIZING_ENVIRONMENT; -import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE; -import static org.apache.nifi.components.validation.ValidationStatus.INVALID; -import static org.apache.nifi.components.validation.ValidationStatus.VALIDATING; -import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION; -import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION; -import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.backup; -import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.persist; -import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.removeIfExists; -import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.revert; - public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationStrategy { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class); - private static final Set INITIALIZING_ASYNC_PROCESSOR_STATES = - Set.of(INITIALIZING_ENVIRONMENT, DOWNLOADING_DEPENDENCIES, LOADING_PROCESSOR_CODE); - - private static int ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS = 5000; - private static int ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES = 60; - private static int VALIDATION_RETRY_PAUSE_DURATION_MS = 1000; - private static int VALIDATION_MAX_RETRIES = 5; - private static int FLOW_DRAIN_RETRY_PAUSE_DURATION_MS = 1000; - private static int FLOW_DRAIN_MAX_RETRIES = 60; + private static final int FLOW_DRAIN_RETRY_PAUSE_DURATION_MS = 1000; + private static final int FLOW_DRAIN_MAX_RETRIES = 60; private final FlowController flowController; private final FlowService flowService; @@ -115,9 +98,9 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt Set originalConnectionIds = emptySet(); try { originalConnectionIds = findAllExistingConnections(flowController.getFlowManager().getRootGroup()) - .stream() - .map(Connection::getIdentifier) - .collect(Collectors.toSet()); + .stream() + .map(Connection::getIdentifier) + .collect(Collectors.toSet()); VersionedDataflow rawDataFlow = flowSerDeService.deserialize(rawFlow); VersionedDataflow propertyEncryptedRawDataFlow = flowPropertyEncryptor.encryptSensitiveProperties(rawDataFlow); @@ -187,9 +170,9 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt return rootProcessGroup -> { LOGGER.warn("Flow did not stop within graceful period. Force stopping flow and emptying non referenced queues"); findAllExistingConnections(rootProcessGroup).stream() - .filter(connection -> !proposedConnectionIds.contains(connection.getIdentifier())) - .map(Connection::getFlowFileQueue) - .forEach(queue -> queue.dropFlowFiles(randomUUID().toString(), randomUUID().toString())); + .filter(connection -> !proposedConnectionIds.contains(connection.getIdentifier())) + .map(Connection::getFlowFileQueue) + .forEach(queue -> queue.dropFlowFiles(randomUUID().toString(), randomUUID().toString())); }; } @@ -216,84 +199,21 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt } } - private List validate(FlowManager flowManager) { - List componentNodes = extractComponentNodes(flowManager); - - retry(() -> initializingAsyncLoadingComponents(componentNodes), List::isEmpty, - ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES, ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS) - .ifPresent(components -> { - LOGGER.error("The following components are async loading components and are still initializing: {}", components); - throw new IllegalStateException("Maximum retry number exceeded while waiting for async loading components to be initialized"); - }); - - retry(() -> componentsInValidatingState(componentNodes), List::isEmpty, VALIDATION_MAX_RETRIES, VALIDATION_RETRY_PAUSE_DURATION_MS) - .ifPresent(components -> { - LOGGER.error("The following components are still in VALIDATING state: {}", components); - throw new IllegalStateException("Maximum retry number exceeded while waiting for components to be validated"); - }); - - return componentNodes.stream() - .map(ComponentNode::getValidationErrors) - .flatMap(Collection::stream) - .toList(); - } - - private List extractComponentNodes(FlowManager flowManager) { - return Stream.of( - flowManager.getAllControllerServices(), - flowManager.getAllReportingTasks(), - Set.copyOf(flowManager.getRootGroup().findAllProcessors())) - .flatMap(Set::stream) - .toList(); - } - - private List componentsInValidatingState(List componentNodes) { - return componentNodes.stream() - .filter(componentNode -> componentNode.performValidation() == VALIDATING) - .toList(); - } - - private List initializingAsyncLoadingComponents(List componentNodes) { - return componentNodes.stream() - .filter(componentNode -> componentNode.performValidation() == INVALID) - .filter(componentNode -> componentNode.getComponent() instanceof AsyncLoadedProcessor asyncLoadedProcessor - && INITIALIZING_ASYNC_PROCESSOR_STATES.contains(asyncLoadedProcessor.getState())) - .toList(); - } - - private Optional retry(Supplier input, Predicate predicate, int maxRetries, int pauseDurationMillis) { - int retries = 0; - while (true) { - T t = input.get(); - if (predicate.test(t)) { - return Optional.empty(); - } - if (retries == maxRetries) { - return Optional.ofNullable(t); - } - retries++; - try { - Thread.sleep(pauseDurationMillis); - } catch (InterruptedException e) { - } - } - } - private Set findAllProposedConnectionIds(VersionedProcessGroup versionedProcessGroup) { return versionedProcessGroup == null - ? emptySet() - : Stream.concat( - versionedProcessGroup.getConnections().stream().map(VersionedConnection::getInstanceIdentifier), - versionedProcessGroup.getProcessGroups().stream().map(this::findAllProposedConnectionIds).flatMap(Set::stream) - ).collect(Collectors.toSet()); + ? emptySet() + : Stream.concat( + versionedProcessGroup.getConnections().stream().map(VersionedConnection::getInstanceIdentifier), + versionedProcessGroup.getProcessGroups().stream().map(this::findAllProposedConnectionIds).flatMap(Set::stream) + ).collect(Collectors.toSet()); } private Set findAllExistingConnections(ProcessGroup processGroup) { return processGroup == null - ? emptySet() - : Stream.concat( - processGroup.getConnections().stream(), - processGroup.getProcessGroups().stream().map(this::findAllExistingConnections).flatMap(Set::stream) - ).collect(Collectors.toSet()); + ? emptySet() + : Stream.concat( + processGroup.getConnections().stream(), + processGroup.getProcessGroups().stream().map(this::findAllExistingConnections).flatMap(Set::stream) + ).collect(Collectors.toSet()); } } diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/validator/FlowValidator.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/validator/FlowValidator.java new file mode 100644 index 0000000000..e479b57dc1 --- /dev/null +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/validator/FlowValidator.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.validator; + +import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES; +import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.INITIALIZING_ENVIRONMENT; +import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE; +import static org.apache.nifi.components.validation.ValidationStatus.INVALID; +import static org.apache.nifi.components.validation.ValidationStatus.VALIDATING; +import static org.apache.nifi.minifi.commons.utils.RetryUtil.retry; + +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.stream.Stream; +import org.apache.nifi.components.AsyncLoadedProcessor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ComponentNode; +import org.apache.nifi.controller.flow.FlowManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class FlowValidator { + + private static final Logger LOGGER = LoggerFactory.getLogger(FlowValidator.class); + + private static final Set INITIALIZING_ASYNC_PROCESSOR_STATES = + Set.of(INITIALIZING_ENVIRONMENT, DOWNLOADING_DEPENDENCIES, LOADING_PROCESSOR_CODE); + + private static final int ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS = 5000; + private static final int ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES = 60; + private static final int VALIDATION_RETRY_PAUSE_DURATION_MS = 1000; + private static final int VALIDATION_MAX_RETRIES = 5; + + private FlowValidator() { + throw new UnsupportedOperationException(); + } + + public static List validate(FlowManager flowManager) { + List componentNodes = extractComponentNodes(flowManager); + + retry(() -> initializingAsyncLoadingComponents(componentNodes), List::isEmpty, + ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES, ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS) + .ifPresent(components -> { + LOGGER.error("The following components are async loading components and are still initializing: {}", components); + throw new IllegalStateException("Maximum retry number exceeded while waiting for async loading components to be initialized"); + }); + + retry(() -> componentsInValidatingState(componentNodes), List::isEmpty, VALIDATION_MAX_RETRIES, VALIDATION_RETRY_PAUSE_DURATION_MS) + .ifPresent(components -> { + LOGGER.error("The following components are still in VALIDATING state: {}", components); + throw new IllegalStateException("Maximum retry number exceeded while waiting for components to be validated"); + }); + + return componentNodes.stream() + .map(ComponentNode::getValidationErrors) + .flatMap(Collection::stream) + .toList(); + } + + private static List extractComponentNodes(FlowManager flowManager) { + return Stream.of( + flowManager.getAllControllerServices(), + flowManager.getAllReportingTasks(), + Set.copyOf(flowManager.getRootGroup().findAllProcessors())) + .flatMap(Set::stream) + .toList(); + } + + private static List componentsInValidatingState(List componentNodes) { + return componentNodes.stream() + .filter(componentNode -> componentNode.performValidation() == VALIDATING) + .toList(); + } + + private static List initializingAsyncLoadingComponents(List componentNodes) { + return componentNodes.stream() + .filter(componentNode -> componentNode.performValidation() == INVALID) + .filter(componentNode -> componentNode.getComponent() instanceof AsyncLoadedProcessor asyncLoadedProcessor + && INITIALIZING_ASYNC_PROCESSOR_STATES.contains(asyncLoadedProcessor.getState())) + .toList(); + } +} diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java index 132be38e59..e00e933c59 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java @@ -19,11 +19,14 @@ package org.apache.nifi.minifi; import static java.util.Optional.ofNullable; import static org.apache.commons.lang3.StringUtils.EMPTY; +import static org.apache.nifi.minifi.validator.FlowValidator.validate; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.List; import java.util.Optional; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.headless.HeadlessNiFiServer; import org.apache.nifi.minifi.bootstrap.BootstrapListener; import org.apache.nifi.minifi.c2.C2NifiClientService; @@ -35,28 +38,14 @@ import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * - */ public class StandardMiNiFiServer extends HeadlessNiFiServer implements MiNiFiServer { private static final Logger logger = LoggerFactory.getLogger(StandardMiNiFiServer.class); - public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port"; + private static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port"; private BootstrapListener bootstrapListener; - - /* A reference to the client service for handling*/ private C2NifiClientService c2NifiClientService; - - public StandardMiNiFiServer() { - super(); - } - - public FlowStatusReport getStatusReport(String requestString) throws StatusRequestException { - return StatusConfigReporter.getStatus(getFlowController(), requestString, logger); - } - @Override public void start() { super.start(); @@ -67,6 +56,16 @@ public class StandardMiNiFiServer extends HeadlessNiFiServer implements MiNiFiSe startHeartbeat(); } + @Override + protected void validateFlow() { + List validationErrors = validate(getFlowController().getFlowManager()); + if (!validationErrors.isEmpty()) { + logger.error("Validation errors found when loading the flow: {}", validationErrors); + throw new IllegalStateException("Unable to start flow due to validation errors"); + } + logger.info("Flow validated successfully"); + } + @Override public void stop(boolean reload) { super.stop(); @@ -86,6 +85,10 @@ public class StandardMiNiFiServer extends HeadlessNiFiServer implements MiNiFiSe } } + public FlowStatusReport getStatusReport(String requestString) throws StatusRequestException { + return StatusConfigReporter.getStatus(getFlowController(), requestString, logger); + } + private void initC2() { if (Boolean.parseBoolean(getNiFiProperties().getProperty(MiNiFiProperties.C2_ENABLE.getKey(), MiNiFiProperties.C2_ENABLE.getDefaultValue()))) { NiFiProperties niFiProperties = getNiFiProperties(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java b/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java index f64a738358..66320bcbc0 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java +++ b/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java @@ -169,6 +169,7 @@ public class HeadlessNiFiServer implements NiFiServer { flowService.start(); flowService.load(null); flowController.onFlowInitialized(true); + validateFlow(); FlowManager flowManager = flowController.getFlowManager(); flowManager.getGroup(flowManager.getRootGroupId()).startProcessing(); @@ -195,6 +196,10 @@ public class HeadlessNiFiServer implements NiFiServer { } } + protected void validateFlow() { + logger.info("Flow validation not implemented. Proceeding without validating the flow"); + } + private void startUpFailure(Throwable t) { System.err.println("Failed to start flow service: " + t.getMessage()); System.err.println("Shutting down...");