NIFI-12335 Fix MiNiFi startup issue

Signed-off-by: Csaba Bejan <bejan.csaba@gmail.com>

This closes #7998.
This commit is contained in:
Ferenc Erdei 2023-11-08 15:18:40 +01:00 committed by Csaba Bejan
parent 3dcfc919bb
commit 4430effe00
1 changed files with 9 additions and 13 deletions

View File

@ -26,6 +26,7 @@ import static java.util.Optional.ofNullable;
import static java.util.UUID.randomUUID; import static java.util.UUID.randomUUID;
import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toMap;
import static org.apache.commons.lang3.StringUtils.EMPTY; import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.nifi.flow.ScheduledState.ENABLED;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonFactory;
@ -90,6 +91,9 @@ public class FlowEnrichService {
} }
VersionedDataflow versionedDataflow = parseVersionedDataflow(flowCandidate); VersionedDataflow versionedDataflow = parseVersionedDataflow(flowCandidate);
versionedDataflow.setReportingTasks(ofNullable(versionedDataflow.getReportingTasks()).orElseGet(ArrayList::new));
versionedDataflow.setRegistries(ofNullable(versionedDataflow.getRegistries()).orElseGet(ArrayList::new));
versionedDataflow.setControllerServices(ofNullable(versionedDataflow.getControllerServices()).orElseGet(ArrayList::new));
Optional<Integer> maxConcurrentThreads = ofNullable(minifiProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS.getKey())) Optional<Integer> maxConcurrentThreads = ofNullable(minifiProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS.getKey()))
.map(Integer::parseInt); .map(Integer::parseInt);
@ -103,13 +107,10 @@ public class FlowEnrichService {
rootGroup.setInstanceIdentifier(randomUUID().toString()); rootGroup.setInstanceIdentifier(randomUUID().toString());
} }
rootGroup.getControllerServices().forEach(controllerService -> controllerService.setScheduledState(ENABLED));
Optional<VersionedControllerService> commonSslControllerService = createCommonSslControllerService(); Optional<VersionedControllerService> commonSslControllerService = createCommonSslControllerService();
commonSslControllerService commonSslControllerService.ifPresent(versionedDataflow.getControllerServices()::add);
.ifPresent(sslControllerService -> {
List<VersionedControllerService> currentControllerServices = ofNullable(versionedDataflow.getControllerServices()).orElseGet(ArrayList::new);
currentControllerServices.add(sslControllerService);
versionedDataflow.setControllerServices(currentControllerServices);
});
commonSslControllerService commonSslControllerService
.filter(__ -> parseBoolean(minifiProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_USE_PARENT_SSL.getKey()))) .filter(__ -> parseBoolean(minifiProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_USE_PARENT_SSL.getKey())))
@ -117,12 +118,7 @@ public class FlowEnrichService {
.ifPresent(commonSslControllerServiceInstanceId -> overrideProcessorsSslControllerService(rootGroup, commonSslControllerServiceInstanceId)); .ifPresent(commonSslControllerServiceInstanceId -> overrideProcessorsSslControllerService(rootGroup, commonSslControllerServiceInstanceId));
createProvenanceReportingTask(commonSslControllerService.map(VersionedComponent::getInstanceIdentifier).orElse(EMPTY)) createProvenanceReportingTask(commonSslControllerService.map(VersionedComponent::getInstanceIdentifier).orElse(EMPTY))
.ifPresent(provenanceReportingTask -> { .ifPresent(versionedDataflow.getReportingTasks()::add);
List<VersionedReportingTask> currentReportingTasks = ofNullable(versionedDataflow.getReportingTasks()).orElseGet(ArrayList::new);
currentReportingTasks.add(provenanceReportingTask);
versionedDataflow.setReportingTasks(currentReportingTasks);
});
byte[] enrichedFlow = toByteArray(versionedDataflow); byte[] enrichedFlow = toByteArray(versionedDataflow);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Enriched flow with content: \n{}", new String(enrichedFlow, UTF_8)); LOG.debug("Enriched flow with content: \n{}", new String(enrichedFlow, UTF_8));
@ -159,7 +155,7 @@ public class FlowEnrichService {
sslControllerService.setName(COMMON_SSL_CONTEXT_SERVICE_NAME); sslControllerService.setName(COMMON_SSL_CONTEXT_SERVICE_NAME);
sslControllerService.setComments(EMPTY); sslControllerService.setComments(EMPTY);
sslControllerService.setType(STANDARD_RESTRICTED_SSL_CONTEXT_SERVICE); sslControllerService.setType(STANDARD_RESTRICTED_SSL_CONTEXT_SERVICE);
sslControllerService.setScheduledState(ScheduledState.ENABLED); sslControllerService.setScheduledState(ENABLED);
sslControllerService.setBulletinLevel(LogLevel.WARN.name()); sslControllerService.setBulletinLevel(LogLevel.WARN.name());
sslControllerService.setComponentType(ComponentType.CONTROLLER_SERVICE); sslControllerService.setComponentType(ComponentType.CONTROLLER_SERVICE);
sslControllerService.setBundle(createBundle(SSL_CONTEXT_SERVICE_NAR)); sslControllerService.setBundle(createBundle(SSL_CONTEXT_SERVICE_NAR));