NIFI-13541 [MiNiFi] Validate flow during startup

Signed-off-by: Csaba Bejan <bejan.csaba@gmail.com>

This closes #9073.
This commit is contained in:
Ferenc Kis 2024-07-11 12:35:07 +02:00 committed by Csaba Bejan
parent 94f8688545
commit dca7aac9b3
No known key found for this signature in database
GPG Key ID: C59951609F8BDDEB
6 changed files with 214 additions and 136 deletions

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.commons.utils;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.function.Supplier;
public final class RetryUtil {
private RetryUtil() {
throw new UnsupportedOperationException();
}
public static <T> Optional<T> retry(Supplier<T> input, Predicate<T> predicate, int maxRetries, int pauseDurationMillis) {
int retries = 0;
while (true) {
T t = input.get();
if (predicate.test(t)) {
return Optional.empty();
}
if (retries == maxRetries) {
return Optional.ofNullable(t);
}
retries++;
try {
Thread.sleep(pauseDurationMillis);
} catch (InterruptedException e) {
}
}
}
}

View File

@ -145,6 +145,11 @@ limitations under the License.
<artifactId>nifi-framework-core</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi.minifi</groupId>
<artifactId>minifi-commons-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-python-framework-api</artifactId>

View File

@ -17,15 +17,35 @@
package org.apache.nifi.minifi.c2.command;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.emptySet;
import static java.util.UUID.randomUUID;
import static java.util.function.Predicate.not;
import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.backup;
import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.persist;
import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.removeIfExists;
import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.revert;
import static org.apache.nifi.minifi.commons.utils.RetryUtil.retry;
import static org.apache.nifi.minifi.validator.FlowValidator.validate;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FilenameUtils;
import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.flow.VersionedDataflow;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedProcessGroup;
@ -38,49 +58,12 @@ import org.apache.nifi.services.FlowService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.emptySet;
import static java.util.UUID.randomUUID;
import static java.util.function.Predicate.not;
import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES;
import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.INITIALIZING_ENVIRONMENT;
import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE;
import static org.apache.nifi.components.validation.ValidationStatus.INVALID;
import static org.apache.nifi.components.validation.ValidationStatus.VALIDATING;
import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.backup;
import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.persist;
import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.removeIfExists;
import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.revert;
public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationStrategy {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class);
private static final Set<AsyncLoadedProcessor.LoadState> INITIALIZING_ASYNC_PROCESSOR_STATES =
Set.of(INITIALIZING_ENVIRONMENT, DOWNLOADING_DEPENDENCIES, LOADING_PROCESSOR_CODE);
private static int ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS = 5000;
private static int ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES = 60;
private static int VALIDATION_RETRY_PAUSE_DURATION_MS = 1000;
private static int VALIDATION_MAX_RETRIES = 5;
private static int FLOW_DRAIN_RETRY_PAUSE_DURATION_MS = 1000;
private static int FLOW_DRAIN_MAX_RETRIES = 60;
private static final int FLOW_DRAIN_RETRY_PAUSE_DURATION_MS = 1000;
private static final int FLOW_DRAIN_MAX_RETRIES = 60;
private final FlowController flowController;
private final FlowService flowService;
@ -115,9 +98,9 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
Set<String> originalConnectionIds = emptySet();
try {
originalConnectionIds = findAllExistingConnections(flowController.getFlowManager().getRootGroup())
.stream()
.map(Connection::getIdentifier)
.collect(Collectors.toSet());
.stream()
.map(Connection::getIdentifier)
.collect(Collectors.toSet());
VersionedDataflow rawDataFlow = flowSerDeService.deserialize(rawFlow);
VersionedDataflow propertyEncryptedRawDataFlow = flowPropertyEncryptor.encryptSensitiveProperties(rawDataFlow);
@ -187,9 +170,9 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
return rootProcessGroup -> {
LOGGER.warn("Flow did not stop within graceful period. Force stopping flow and emptying non referenced queues");
findAllExistingConnections(rootProcessGroup).stream()
.filter(connection -> !proposedConnectionIds.contains(connection.getIdentifier()))
.map(Connection::getFlowFileQueue)
.forEach(queue -> queue.dropFlowFiles(randomUUID().toString(), randomUUID().toString()));
.filter(connection -> !proposedConnectionIds.contains(connection.getIdentifier()))
.map(Connection::getFlowFileQueue)
.forEach(queue -> queue.dropFlowFiles(randomUUID().toString(), randomUUID().toString()));
};
}
@ -216,84 +199,21 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
}
}
private List<ValidationResult> validate(FlowManager flowManager) {
List<? extends ComponentNode> componentNodes = extractComponentNodes(flowManager);
retry(() -> initializingAsyncLoadingComponents(componentNodes), List::isEmpty,
ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES, ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS)
.ifPresent(components -> {
LOGGER.error("The following components are async loading components and are still initializing: {}", components);
throw new IllegalStateException("Maximum retry number exceeded while waiting for async loading components to be initialized");
});
retry(() -> componentsInValidatingState(componentNodes), List::isEmpty, VALIDATION_MAX_RETRIES, VALIDATION_RETRY_PAUSE_DURATION_MS)
.ifPresent(components -> {
LOGGER.error("The following components are still in VALIDATING state: {}", components);
throw new IllegalStateException("Maximum retry number exceeded while waiting for components to be validated");
});
return componentNodes.stream()
.map(ComponentNode::getValidationErrors)
.flatMap(Collection::stream)
.toList();
}
private List<? extends ComponentNode> extractComponentNodes(FlowManager flowManager) {
return Stream.of(
flowManager.getAllControllerServices(),
flowManager.getAllReportingTasks(),
Set.copyOf(flowManager.getRootGroup().findAllProcessors()))
.flatMap(Set::stream)
.toList();
}
private List<? extends ComponentNode> componentsInValidatingState(List<? extends ComponentNode> componentNodes) {
return componentNodes.stream()
.filter(componentNode -> componentNode.performValidation() == VALIDATING)
.toList();
}
private List<? extends ComponentNode> initializingAsyncLoadingComponents(List<? extends ComponentNode> componentNodes) {
return componentNodes.stream()
.filter(componentNode -> componentNode.performValidation() == INVALID)
.filter(componentNode -> componentNode.getComponent() instanceof AsyncLoadedProcessor asyncLoadedProcessor
&& INITIALIZING_ASYNC_PROCESSOR_STATES.contains(asyncLoadedProcessor.getState()))
.toList();
}
private <T> Optional<T> retry(Supplier<T> input, Predicate<T> predicate, int maxRetries, int pauseDurationMillis) {
int retries = 0;
while (true) {
T t = input.get();
if (predicate.test(t)) {
return Optional.empty();
}
if (retries == maxRetries) {
return Optional.ofNullable(t);
}
retries++;
try {
Thread.sleep(pauseDurationMillis);
} catch (InterruptedException e) {
}
}
}
private Set<String> findAllProposedConnectionIds(VersionedProcessGroup versionedProcessGroup) {
return versionedProcessGroup == null
? emptySet()
: Stream.concat(
versionedProcessGroup.getConnections().stream().map(VersionedConnection::getInstanceIdentifier),
versionedProcessGroup.getProcessGroups().stream().map(this::findAllProposedConnectionIds).flatMap(Set::stream)
).collect(Collectors.toSet());
? emptySet()
: Stream.concat(
versionedProcessGroup.getConnections().stream().map(VersionedConnection::getInstanceIdentifier),
versionedProcessGroup.getProcessGroups().stream().map(this::findAllProposedConnectionIds).flatMap(Set::stream)
).collect(Collectors.toSet());
}
private Set<Connection> findAllExistingConnections(ProcessGroup processGroup) {
return processGroup == null
? emptySet()
: Stream.concat(
processGroup.getConnections().stream(),
processGroup.getProcessGroups().stream().map(this::findAllExistingConnections).flatMap(Set::stream)
).collect(Collectors.toSet());
? emptySet()
: Stream.concat(
processGroup.getConnections().stream(),
processGroup.getProcessGroups().stream().map(this::findAllExistingConnections).flatMap(Set::stream)
).collect(Collectors.toSet());
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.minifi.validator;
import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES;
import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.INITIALIZING_ENVIRONMENT;
import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.LOADING_PROCESSOR_CODE;
import static org.apache.nifi.components.validation.ValidationStatus.INVALID;
import static org.apache.nifi.components.validation.ValidationStatus.VALIDATING;
import static org.apache.nifi.minifi.commons.utils.RetryUtil.retry;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.flow.FlowManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class FlowValidator {
private static final Logger LOGGER = LoggerFactory.getLogger(FlowValidator.class);
private static final Set<AsyncLoadedProcessor.LoadState> INITIALIZING_ASYNC_PROCESSOR_STATES =
Set.of(INITIALIZING_ENVIRONMENT, DOWNLOADING_DEPENDENCIES, LOADING_PROCESSOR_CODE);
private static final int ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS = 5000;
private static final int ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES = 60;
private static final int VALIDATION_RETRY_PAUSE_DURATION_MS = 1000;
private static final int VALIDATION_MAX_RETRIES = 5;
private FlowValidator() {
throw new UnsupportedOperationException();
}
public static List<ValidationResult> validate(FlowManager flowManager) {
List<? extends ComponentNode> componentNodes = extractComponentNodes(flowManager);
retry(() -> initializingAsyncLoadingComponents(componentNodes), List::isEmpty,
ASYNC_LOADING_COMPONENT_INIT_MAX_RETRIES, ASYNC_LOADING_COMPONENT_INIT_RETRY_PAUSE_DURATION_MS)
.ifPresent(components -> {
LOGGER.error("The following components are async loading components and are still initializing: {}", components);
throw new IllegalStateException("Maximum retry number exceeded while waiting for async loading components to be initialized");
});
retry(() -> componentsInValidatingState(componentNodes), List::isEmpty, VALIDATION_MAX_RETRIES, VALIDATION_RETRY_PAUSE_DURATION_MS)
.ifPresent(components -> {
LOGGER.error("The following components are still in VALIDATING state: {}", components);
throw new IllegalStateException("Maximum retry number exceeded while waiting for components to be validated");
});
return componentNodes.stream()
.map(ComponentNode::getValidationErrors)
.flatMap(Collection::stream)
.toList();
}
private static List<? extends ComponentNode> extractComponentNodes(FlowManager flowManager) {
return Stream.of(
flowManager.getAllControllerServices(),
flowManager.getAllReportingTasks(),
Set.copyOf(flowManager.getRootGroup().findAllProcessors()))
.flatMap(Set::stream)
.toList();
}
private static List<? extends ComponentNode> componentsInValidatingState(List<? extends ComponentNode> componentNodes) {
return componentNodes.stream()
.filter(componentNode -> componentNode.performValidation() == VALIDATING)
.toList();
}
private static List<? extends ComponentNode> initializingAsyncLoadingComponents(List<? extends ComponentNode> componentNodes) {
return componentNodes.stream()
.filter(componentNode -> componentNode.performValidation() == INVALID)
.filter(componentNode -> componentNode.getComponent() instanceof AsyncLoadedProcessor asyncLoadedProcessor
&& INITIALIZING_ASYNC_PROCESSOR_STATES.contains(asyncLoadedProcessor.getState()))
.toList();
}
}

View File

@ -19,11 +19,14 @@ package org.apache.nifi.minifi;
import static java.util.Optional.ofNullable;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.nifi.minifi.validator.FlowValidator.validate;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.headless.HeadlessNiFiServer;
import org.apache.nifi.minifi.bootstrap.BootstrapListener;
import org.apache.nifi.minifi.c2.C2NifiClientService;
@ -35,28 +38,14 @@ import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class StandardMiNiFiServer extends HeadlessNiFiServer implements MiNiFiServer {
private static final Logger logger = LoggerFactory.getLogger(StandardMiNiFiServer.class);
public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port";
private static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port";
private BootstrapListener bootstrapListener;
/* A reference to the client service for handling*/
private C2NifiClientService c2NifiClientService;
public StandardMiNiFiServer() {
super();
}
public FlowStatusReport getStatusReport(String requestString) throws StatusRequestException {
return StatusConfigReporter.getStatus(getFlowController(), requestString, logger);
}
@Override
public void start() {
super.start();
@ -67,6 +56,16 @@ public class StandardMiNiFiServer extends HeadlessNiFiServer implements MiNiFiSe
startHeartbeat();
}
@Override
protected void validateFlow() {
List<ValidationResult> validationErrors = validate(getFlowController().getFlowManager());
if (!validationErrors.isEmpty()) {
logger.error("Validation errors found when loading the flow: {}", validationErrors);
throw new IllegalStateException("Unable to start flow due to validation errors");
}
logger.info("Flow validated successfully");
}
@Override
public void stop(boolean reload) {
super.stop();
@ -86,6 +85,10 @@ public class StandardMiNiFiServer extends HeadlessNiFiServer implements MiNiFiSe
}
}
public FlowStatusReport getStatusReport(String requestString) throws StatusRequestException {
return StatusConfigReporter.getStatus(getFlowController(), requestString, logger);
}
private void initC2() {
if (Boolean.parseBoolean(getNiFiProperties().getProperty(MiNiFiProperties.C2_ENABLE.getKey(), MiNiFiProperties.C2_ENABLE.getDefaultValue()))) {
NiFiProperties niFiProperties = getNiFiProperties();

View File

@ -169,6 +169,7 @@ public class HeadlessNiFiServer implements NiFiServer {
flowService.start();
flowService.load(null);
flowController.onFlowInitialized(true);
validateFlow();
FlowManager flowManager = flowController.getFlowManager();
flowManager.getGroup(flowManager.getRootGroupId()).startProcessing();
@ -195,6 +196,10 @@ public class HeadlessNiFiServer implements NiFiServer {
}
}
protected void validateFlow() {
logger.info("Flow validation not implemented. Proceeding without validating the flow");
}
private void startUpFailure(Throwable t) {
System.err.println("Failed to start flow service: " + t.getMessage());
System.err.println("Shutting down...");