NIFI-13436 Retain queue for non-modified connections during MiNiFi flow update

This closes #8998.

Signed-off-by: Ferenc Kis <briansolo1985@gmail.com>
This commit is contained in:
Ferenc Erdei 2024-06-24 15:03:02 +02:00 committed by Ferenc Kis
parent 3efb0763da
commit 903090b649
No known key found for this signature in database
GPG Key ID: 5E1CCAC15A5958F2

View File

@ -17,7 +17,43 @@
package org.apache.nifi.minifi.c2.command;
import org.apache.commons.io.FilenameUtils;
import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.flow.VersionedDataflow;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.minifi.commons.service.FlowEnrichService;
import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.FlowSerDeService;
import org.apache.nifi.services.FlowService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.emptySet;
import static java.util.UUID.randomUUID;
import static java.util.function.Predicate.not;
import static org.apache.nifi.components.AsyncLoadedProcessor.LoadState.DOWNLOADING_DEPENDENCIES;
@ -32,35 +68,6 @@ import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.persist;
import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.removeIfExists;
import static org.apache.nifi.minifi.commons.util.FlowUpdateUtils.revert;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.commons.io.FilenameUtils;
import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
import org.apache.nifi.components.AsyncLoadedProcessor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.flow.VersionedDataflow;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.minifi.commons.service.FlowEnrichService;
import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.FlowSerDeService;
import org.apache.nifi.services.FlowService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationStrategy {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class);
@ -105,8 +112,12 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Attempting to update flow with content: \n{}", new String(rawFlow, UTF_8));
}
Set<String> originalConnectionIds = emptySet();
try {
originalConnectionIds = findAllExistingConnections(flowController.getFlowManager().getRootGroup())
.stream()
.map(Connection::getIdentifier)
.collect(Collectors.toSet());
VersionedDataflow rawDataFlow = flowSerDeService.deserialize(rawFlow);
VersionedDataflow propertyEncryptedRawDataFlow = flowPropertyEncryptor.encryptSensitiveProperties(rawDataFlow);
@ -120,7 +131,7 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
persist(serializedPropertyEncryptedRawDataFlow, rawFlowConfigurationFile, false);
persist(serializedEnrichedFlowCandidate, flowConfigurationFile, true);
reloadFlow();
reloadFlow(findAllProposedConnectionIds(enrichedFlowCandidate.getRootGroup()));
return true;
} catch (IllegalStateException e) {
@ -128,7 +139,7 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
revert(backupFlowConfigurationFile, flowConfigurationFile);
revert(backupRawFlowConfigurationFile, rawFlowConfigurationFile);
try {
reloadFlow();
reloadFlow(originalConnectionIds);
} catch (IOException ex) {
LOGGER.error("Unable to reload the reverted flow", e);
}
@ -144,9 +155,9 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
}
}
private void reloadFlow() throws IOException {
private void reloadFlow(Set<String> proposedConnectionIds) throws IOException {
LOGGER.info("Initiating flow reload");
stopFlowGracefully(flowController.getFlowManager().getRootGroup());
stopFlowGracefully(flowController.getFlowManager().getRootGroup(), proposedConnectionIds);
flowService.load(null);
flowController.onFlowInitialized(true);
@ -161,7 +172,7 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
LOGGER.info("Flow has been reloaded successfully");
}
private void stopFlowGracefully(ProcessGroup rootGroup) {
private void stopFlowGracefully(ProcessGroup rootGroup, Set<String> proposedConnectionIds) {
LOGGER.info("Stopping flow gracefully");
Optional<ProcessGroup> drainResult = stopSourceProcessorsAndWaitFlowToDrain(rootGroup);
@ -169,12 +180,17 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
rootGroup.getRemoteProcessGroups().stream()
.map(RemoteProcessGroup::stopTransmitting)
.forEach(this::waitForStopOrLogTimeOut);
drainResult.ifPresentOrElse(
rootProcessGroup -> {
LOGGER.warn("Flow did not stop within graceful period. Force stopping flow and emptying queues");
rootProcessGroup.dropAllFlowFiles(randomUUID().toString(), randomUUID().toString());
},
() -> LOGGER.info("Flow has been stopped gracefully"));
drainResult.ifPresentOrElse(emptyQueuesForNonReferencedQueues(proposedConnectionIds), () -> LOGGER.info("Flow has been stopped gracefully"));
}
private Consumer<ProcessGroup> emptyQueuesForNonReferencedQueues(Set<String> proposedConnectionIds) {
return rootProcessGroup -> {
LOGGER.warn("Flow did not stop within graceful period. Force stopping flow and emptying non referenced queues");
findAllExistingConnections(rootProcessGroup).stream()
.filter(connection -> !proposedConnectionIds.contains(connection.getIdentifier()))
.map(Connection::getFlowFileQueue)
.forEach(queue -> queue.dropFlowFiles(randomUUID().toString(), randomUUID().toString()));
};
}
private Optional<ProcessGroup> stopSourceProcessorsAndWaitFlowToDrain(ProcessGroup rootGroup) {
@ -262,4 +278,22 @@ public class DefaultUpdateConfigurationStrategy implements UpdateConfigurationSt
}
}
}
private Set<String> findAllProposedConnectionIds(VersionedProcessGroup versionedProcessGroup) {
return versionedProcessGroup == null
? emptySet()
: Stream.concat(
versionedProcessGroup.getConnections().stream().map(VersionedConnection::getInstanceIdentifier),
versionedProcessGroup.getProcessGroups().stream().map(this::findAllProposedConnectionIds).flatMap(Set::stream)
).collect(Collectors.toSet());
}
private Set<Connection> findAllExistingConnections(ProcessGroup processGroup) {
return processGroup == null
? emptySet()
: Stream.concat(
processGroup.getConnections().stream(),
processGroup.getProcessGroups().stream().map(this::findAllExistingConnections).flatMap(Set::stream)
).collect(Collectors.toSet());
}
}