mirror of https://github.com/apache/nifi.git
NIFI-13020 Fix legacy flow C2 update issue
Signed-off-by: Csaba Bejan <bejan.csaba@gmail.com> This closes #8624
This commit is contained in:
parent
8b39a65d5d
commit
bc589e273d
|
@ -20,26 +20,37 @@ package org.apache.nifi.minifi.commons.service;
|
||||||
import static java.lang.Boolean.FALSE;
|
import static java.lang.Boolean.FALSE;
|
||||||
import static java.lang.Boolean.parseBoolean;
|
import static java.lang.Boolean.parseBoolean;
|
||||||
import static java.util.Map.entry;
|
import static java.util.Map.entry;
|
||||||
|
import static java.util.Objects.isNull;
|
||||||
import static java.util.Optional.empty;
|
import static java.util.Optional.empty;
|
||||||
import static java.util.Optional.ofNullable;
|
import static java.util.Optional.ofNullable;
|
||||||
import static java.util.UUID.randomUUID;
|
import static java.util.UUID.randomUUID;
|
||||||
import static java.util.stream.Collectors.toMap;
|
import static java.util.stream.Collectors.toMap;
|
||||||
|
import static java.util.stream.Collectors.toSet;
|
||||||
import static org.apache.commons.lang3.StringUtils.EMPTY;
|
import static org.apache.commons.lang3.StringUtils.EMPTY;
|
||||||
|
import static org.apache.commons.lang3.StringUtils.isBlank;
|
||||||
import static org.apache.nifi.flow.ScheduledState.ENABLED;
|
import static org.apache.nifi.flow.ScheduledState.ENABLED;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import java.util.stream.Stream;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.controller.flow.VersionedDataflow;
|
import org.apache.nifi.controller.flow.VersionedDataflow;
|
||||||
import org.apache.nifi.flow.Bundle;
|
import org.apache.nifi.flow.Bundle;
|
||||||
import org.apache.nifi.flow.ComponentType;
|
import org.apache.nifi.flow.ComponentType;
|
||||||
|
import org.apache.nifi.flow.ConnectableComponent;
|
||||||
import org.apache.nifi.flow.ControllerServiceAPI;
|
import org.apache.nifi.flow.ControllerServiceAPI;
|
||||||
|
import org.apache.nifi.flow.Position;
|
||||||
import org.apache.nifi.flow.ScheduledState;
|
import org.apache.nifi.flow.ScheduledState;
|
||||||
import org.apache.nifi.flow.VersionedComponent;
|
import org.apache.nifi.flow.VersionedComponent;
|
||||||
import org.apache.nifi.flow.VersionedControllerService;
|
import org.apache.nifi.flow.VersionedControllerService;
|
||||||
import org.apache.nifi.flow.VersionedProcessGroup;
|
import org.apache.nifi.flow.VersionedProcessGroup;
|
||||||
|
import org.apache.nifi.flow.VersionedProcessor;
|
||||||
|
import org.apache.nifi.flow.VersionedRemoteProcessGroup;
|
||||||
import org.apache.nifi.flow.VersionedReportingTask;
|
import org.apache.nifi.flow.VersionedReportingTask;
|
||||||
import org.apache.nifi.logging.LogLevel;
|
import org.apache.nifi.logging.LogLevel;
|
||||||
import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
|
import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
|
||||||
|
@ -68,6 +79,10 @@ public class StandardFlowEnrichService implements FlowEnrichService {
|
||||||
private static final String SITE_TO_SITE_REPORTING_NAR_ARTIFACT = "nifi-site-to-site-reporting-nar";
|
private static final String SITE_TO_SITE_REPORTING_NAR_ARTIFACT = "nifi-site-to-site-reporting-nar";
|
||||||
private static final String PROVENANCE_REPORTING_TASK_PROTOCOL = "HTTP";
|
private static final String PROVENANCE_REPORTING_TASK_PROTOCOL = "HTTP";
|
||||||
private static final String PROVENANCE_REPORTING_TASK_BEGINNING_OF_STREAM = "beginning-of-stream";
|
private static final String PROVENANCE_REPORTING_TASK_BEGINNING_OF_STREAM = "beginning-of-stream";
|
||||||
|
private static final String DEFAULT_BULLETIN_LEVEL = "WARN";
|
||||||
|
private static final String DEFAULT_EXECUTION_NODE = "ALL";
|
||||||
|
private static final Position DEFAULT_POSITION = new Position(0.0, 0.0);
|
||||||
|
private static final Predicate<? super VersionedComponent> IS_LEGACY_COMPONENT = versionedComponent -> isBlank(versionedComponent.getInstanceIdentifier());
|
||||||
|
|
||||||
private final ReadableProperties minifiProperties;
|
private final ReadableProperties minifiProperties;
|
||||||
|
|
||||||
|
@ -86,11 +101,11 @@ public class StandardFlowEnrichService implements FlowEnrichService {
|
||||||
maxConcurrentThreads.ifPresent(versionedDataflow::setMaxTimerDrivenThreadCount);
|
maxConcurrentThreads.ifPresent(versionedDataflow::setMaxTimerDrivenThreadCount);
|
||||||
|
|
||||||
VersionedProcessGroup rootGroup = versionedDataflow.getRootGroup();
|
VersionedProcessGroup rootGroup = versionedDataflow.getRootGroup();
|
||||||
if (rootGroup.getIdentifier() == null) {
|
if (isBlank(rootGroup.getIdentifier())) {
|
||||||
rootGroup.setIdentifier(randomUUID().toString());
|
rootGroup.setIdentifier(randomUUID().toString());
|
||||||
}
|
}
|
||||||
if (rootGroup.getInstanceIdentifier() == null) {
|
if (isNull(rootGroup.getPosition())) {
|
||||||
rootGroup.setInstanceIdentifier(randomUUID().toString());
|
rootGroup.setPosition(DEFAULT_POSITION);
|
||||||
}
|
}
|
||||||
|
|
||||||
rootGroup.getControllerServices().forEach(controllerService -> controllerService.setScheduledState(ENABLED));
|
rootGroup.getControllerServices().forEach(controllerService -> controllerService.setScheduledState(ENABLED));
|
||||||
|
@ -106,6 +121,13 @@ public class StandardFlowEnrichService implements FlowEnrichService {
|
||||||
createProvenanceReportingTask(commonSslControllerService.map(VersionedComponent::getInstanceIdentifier).orElse(EMPTY))
|
createProvenanceReportingTask(commonSslControllerService.map(VersionedComponent::getInstanceIdentifier).orElse(EMPTY))
|
||||||
.ifPresent(versionedDataflow.getReportingTasks()::add);
|
.ifPresent(versionedDataflow.getReportingTasks()::add);
|
||||||
|
|
||||||
|
if (IS_LEGACY_COMPONENT.test(rootGroup)) {
|
||||||
|
LOG.info("Legacy flow detected. Initializing missing but mandatory properties on components");
|
||||||
|
initializeComponentsMissingProperties(rootGroup);
|
||||||
|
Map<String, String> idToInstanceIdMap = createIdToInstanceIdMap(rootGroup);
|
||||||
|
setConnectableComponentsInstanceId(rootGroup, idToInstanceIdMap);
|
||||||
|
}
|
||||||
|
|
||||||
return versionedDataflow;
|
return versionedDataflow;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -225,6 +247,88 @@ public class StandardFlowEnrichService implements FlowEnrichService {
|
||||||
entry("SSL Context Service", sslControllerServiceIdentifier))
|
entry("SSL Context Service", sslControllerServiceIdentifier))
|
||||||
.stream()
|
.stream()
|
||||||
.filter(entry -> StringUtils.isNotBlank(entry.getValue()))
|
.filter(entry -> StringUtils.isNotBlank(entry.getValue()))
|
||||||
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
|
.collect(toMap(Entry::getKey, Entry::getValue));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeComponentsMissingProperties(VersionedProcessGroup versionedProcessGroup) {
|
||||||
|
versionedProcessGroup.setInstanceIdentifier(randomUUID().toString());
|
||||||
|
|
||||||
|
Stream.of(
|
||||||
|
ofNullable(versionedProcessGroup.getControllerServices()).orElse(Set.of()),
|
||||||
|
ofNullable(versionedProcessGroup.getConnections()).orElse(Set.of()),
|
||||||
|
ofNullable(versionedProcessGroup.getProcessors()).orElse(Set.of()),
|
||||||
|
ofNullable(versionedProcessGroup.getInputPorts()).orElse(Set.of()),
|
||||||
|
ofNullable(versionedProcessGroup.getOutputPorts()).orElse(Set.of()),
|
||||||
|
ofNullable(versionedProcessGroup.getFunnels()).orElse(Set.of()),
|
||||||
|
ofNullable(versionedProcessGroup.getRemoteProcessGroups()).orElse(Set.of()),
|
||||||
|
ofNullable(versionedProcessGroup.getRemoteProcessGroups()).orElse(Set.of())
|
||||||
|
.stream()
|
||||||
|
.map(VersionedRemoteProcessGroup::getInputPorts)
|
||||||
|
.flatMap(Set::stream)
|
||||||
|
.collect(toSet()),
|
||||||
|
ofNullable(versionedProcessGroup.getRemoteProcessGroups()).orElse(Set.of())
|
||||||
|
.stream()
|
||||||
|
.map(VersionedRemoteProcessGroup::getOutputPorts)
|
||||||
|
.flatMap(Set::stream)
|
||||||
|
.collect(toSet()))
|
||||||
|
.flatMap(Set::stream)
|
||||||
|
.filter(IS_LEGACY_COMPONENT)
|
||||||
|
.forEach(versionedComponent -> {
|
||||||
|
versionedComponent.setInstanceIdentifier(randomUUID().toString());
|
||||||
|
if (versionedComponent instanceof VersionedProcessor processor) {
|
||||||
|
if (isBlank(processor.getBulletinLevel())) {
|
||||||
|
processor.setBulletinLevel(DEFAULT_BULLETIN_LEVEL);
|
||||||
|
}
|
||||||
|
if (isBlank(processor.getExecutionNode())) {
|
||||||
|
processor.setExecutionNode(DEFAULT_EXECUTION_NODE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
versionedProcessGroup.getProcessGroups().forEach(this::initializeComponentsMissingProperties);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, String> createIdToInstanceIdMap(VersionedProcessGroup versionedProcessGroup) {
|
||||||
|
Map<String, String> thisProcessGroupIdToInstanceIdMaps = Stream.of(
|
||||||
|
ofNullable(versionedProcessGroup.getProcessors()).orElse(Set.of()),
|
||||||
|
ofNullable(versionedProcessGroup.getInputPorts()).orElse(Set.of()),
|
||||||
|
ofNullable(versionedProcessGroup.getOutputPorts()).orElse(Set.of()),
|
||||||
|
ofNullable(versionedProcessGroup.getFunnels()).orElse(Set.of()),
|
||||||
|
ofNullable(versionedProcessGroup.getRemoteProcessGroups()).orElse(Set.of())
|
||||||
|
.stream()
|
||||||
|
.map(VersionedRemoteProcessGroup::getInputPorts)
|
||||||
|
.flatMap(Set::stream)
|
||||||
|
.collect(toSet()),
|
||||||
|
ofNullable(versionedProcessGroup.getRemoteProcessGroups()).orElse(Set.of())
|
||||||
|
.stream()
|
||||||
|
.map(VersionedRemoteProcessGroup::getOutputPorts)
|
||||||
|
.flatMap(Set::stream)
|
||||||
|
.collect(toSet())
|
||||||
|
)
|
||||||
|
.flatMap(Set::stream)
|
||||||
|
.collect(toMap(VersionedComponent::getIdentifier, VersionedComponent::getInstanceIdentifier));
|
||||||
|
|
||||||
|
Stream<Map<String, String>> childProcessGroupsIdToInstanceIdMaps = ofNullable(versionedProcessGroup.getProcessGroups()).orElse(Set.of())
|
||||||
|
.stream()
|
||||||
|
.map(this::createIdToInstanceIdMap);
|
||||||
|
|
||||||
|
return Stream.concat(
|
||||||
|
Stream.of(thisProcessGroupIdToInstanceIdMaps),
|
||||||
|
childProcessGroupsIdToInstanceIdMaps)
|
||||||
|
.map(Map::entrySet)
|
||||||
|
.flatMap(Set::stream)
|
||||||
|
.collect(toMap(Entry::getKey, Entry::getValue));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setConnectableComponentsInstanceId(VersionedProcessGroup versionedProcessGroup, Map<String, String> idToInstanceIdMap) {
|
||||||
|
ofNullable(versionedProcessGroup.getConnections()).orElse(Set.of())
|
||||||
|
.forEach(connection -> {
|
||||||
|
ConnectableComponent source = connection.getSource();
|
||||||
|
source.setInstanceIdentifier(idToInstanceIdMap.get(source.getId()));
|
||||||
|
ConnectableComponent destination = connection.getDestination();
|
||||||
|
destination.setInstanceIdentifier(idToInstanceIdMap.get(destination.getId()));
|
||||||
|
});
|
||||||
|
ofNullable(versionedProcessGroup.getProcessGroups()).orElse(Set.of())
|
||||||
|
.forEach(childProcessGroup -> setConnectableComponentsInstanceId(childProcessGroup, idToInstanceIdMap));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue