mirror of https://github.com/apache/nifi.git
NIFI-1869 Cloning controller services when referenced by reporting tasks and upgrading from 0.x to 1.0.0. This closes #767
This commit is contained in:
parent
df11e1d2c0
commit
a1bb94c08a
|
@ -21,6 +21,7 @@ import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
|
|||
import org.apache.nifi.authorization.Authorizer;
|
||||
import org.apache.nifi.cluster.protocol.DataFlow;
|
||||
import org.apache.nifi.cluster.protocol.StandardDataFlow;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.connectable.Connectable;
|
||||
import org.apache.nifi.connectable.ConnectableType;
|
||||
import org.apache.nifi.connectable.Connection;
|
||||
|
@ -103,11 +104,13 @@ import java.nio.file.StandardOpenOption;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
/**
|
||||
|
@ -298,6 +301,21 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
|||
}
|
||||
}
|
||||
|
||||
// get all the reporting task elements
|
||||
final Element reportingTasksElement = DomUtils.getChild(rootElement, "reportingTasks");
|
||||
final List<Element> reportingTaskElements = new ArrayList<>();
|
||||
if (reportingTasksElement != null) {
|
||||
reportingTaskElements.addAll(DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask"));
|
||||
}
|
||||
|
||||
// get/create all the reporting task nodes and DTOs, but don't apply their scheduled state yet
|
||||
final Map<ReportingTaskNode,ReportingTaskDTO> reportingTaskNodesToDTOs = new HashMap<>();
|
||||
for (final Element taskElement : reportingTaskElements) {
|
||||
final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(taskElement, encryptor);
|
||||
final ReportingTaskNode reportingTask = getOrCreateReportingTask(controller, dto, initialized, existingFlowEmpty);
|
||||
reportingTaskNodesToDTOs.put(reportingTask, dto);
|
||||
}
|
||||
|
||||
final Element controllerServicesElement = DomUtils.getChild(rootElement, "controllerServices");
|
||||
if (controllerServicesElement != null) {
|
||||
final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
|
||||
|
@ -308,7 +326,40 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
|||
// to the root Group. Otherwise, we want to use a null group, which indicates a Controller-level
|
||||
// Controller Service.
|
||||
final ProcessGroup group = (encodingVersion == null) ? rootGroup : null;
|
||||
ControllerServiceLoader.loadControllerServices(serviceElements, controller, group, encryptor, controller.getBulletinRepository(), autoResumeState);
|
||||
final Map<ControllerServiceNode, Element> controllerServices = ControllerServiceLoader.loadControllerServices(serviceElements, controller, group, encryptor);
|
||||
|
||||
// If we are moving controller services to the root group we also need to see if any reporting tasks
|
||||
// reference them, and if so we need to clone the CS and update the reporting task reference
|
||||
if (group != null) {
|
||||
// find all the controller service ids referenced by reporting tasks
|
||||
final Set<String> controllerServicesInReportingTasks = reportingTaskNodesToDTOs.keySet().stream()
|
||||
.flatMap(r -> r.getProperties().entrySet().stream())
|
||||
.filter(e -> e.getKey().getControllerServiceDefinition() != null)
|
||||
.map(e -> e.getValue())
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
// find the controller service nodes for each id referenced by a reporting task
|
||||
final Set<ControllerServiceNode> controllerServicesToClone = controllerServices.keySet().stream()
|
||||
.filter(cs -> controllerServicesInReportingTasks.contains(cs.getIdentifier()))
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
// clone the controller services and map the original id to the clone
|
||||
final Map<String,ControllerServiceNode> controllerServiceMapping = new HashMap<>();
|
||||
for (ControllerServiceNode controllerService : controllerServicesToClone) {
|
||||
final ControllerServiceNode clone = ControllerServiceLoader.cloneControllerService(controller, controllerService);
|
||||
controller.addRootControllerService(clone);
|
||||
controllerServiceMapping.put(controllerService.getIdentifier(), clone);
|
||||
}
|
||||
|
||||
// update the reporting tasks to reference the cloned controller services
|
||||
updateReportingTaskControllerServices(reportingTaskNodesToDTOs.keySet(), controllerServiceMapping);
|
||||
|
||||
// enable all the cloned controller services
|
||||
ControllerServiceLoader.enableControllerServices(controllerServiceMapping.values(), controller, autoResumeState);
|
||||
}
|
||||
|
||||
// enable all the original controller services
|
||||
ControllerServiceLoader.enableControllerServices(controllerServices, controller, encryptor, autoResumeState);
|
||||
} else {
|
||||
for (final Element serviceElement : serviceElements) {
|
||||
updateControllerService(controller, serviceElement, encryptor);
|
||||
|
@ -318,16 +369,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
|||
|
||||
scaleRootGroup(rootGroup, encodingVersion);
|
||||
|
||||
final Element reportingTasksElement = DomUtils.getChild(rootElement, "reportingTasks");
|
||||
if (reportingTasksElement != null) {
|
||||
final List<Element> taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask");
|
||||
for (final Element taskElement : taskElements) {
|
||||
if (!initialized || existingFlowEmpty) {
|
||||
addReportingTask(controller, taskElement, encryptor);
|
||||
} else {
|
||||
updateReportingTask(controller, taskElement, encryptor);
|
||||
}
|
||||
}
|
||||
// now that controller services are loaded and enabled we can apply the scheduled state to each reporting task
|
||||
for (Map.Entry<ReportingTaskNode,ReportingTaskDTO> entry : reportingTaskNodesToDTOs.entrySet()) {
|
||||
applyReportingTaskScheduleState(controller, entry.getValue(), entry.getKey(), initialized, existingFlowEmpty);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -359,6 +403,23 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
|||
}
|
||||
}
|
||||
|
||||
private void updateReportingTaskControllerServices(final Set<ReportingTaskNode> reportingTasks, final Map<String,ControllerServiceNode> controllerServiceMapping) {
|
||||
for (ReportingTaskNode reportingTask : reportingTasks) {
|
||||
if (reportingTask.getProperties() != null) {
|
||||
final Set<Map.Entry<PropertyDescriptor,String>> propertyDescriptors = reportingTask.getProperties().entrySet().stream()
|
||||
.filter(e -> e.getKey().getControllerServiceDefinition() != null)
|
||||
.filter(e -> controllerServiceMapping.containsKey(e.getValue()))
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
for (Map.Entry<PropertyDescriptor,String> propEntry : propertyDescriptors) {
|
||||
final PropertyDescriptor propertyDescriptor = propEntry.getKey();
|
||||
final ControllerServiceNode clone = controllerServiceMapping.get(propEntry.getValue());
|
||||
reportingTask.setProperty(propertyDescriptor.getName(), clone.getIdentifier());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void addLocalTemplates(final Element processGroupElement, final ProcessGroup processGroup, final FlowEncodingVersion encodingVersion) {
|
||||
// Replace the templates with those from the proposed flow
|
||||
final List<Element> templateNodeList = getChildrenByTagName(processGroupElement, "template");
|
||||
|
@ -461,35 +522,53 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
|||
}
|
||||
}
|
||||
|
||||
private void addReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) throws ReportingTaskInstantiationException {
|
||||
final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor);
|
||||
private ReportingTaskNode getOrCreateReportingTask(final FlowController controller, final ReportingTaskDTO dto, final boolean controllerInitialized, final boolean existingFlowEmpty)
|
||||
throws ReportingTaskInstantiationException {
|
||||
// create a new reporting task node when the controller is not initialized or the flow is empty
|
||||
if (!controllerInitialized || existingFlowEmpty) {
|
||||
final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), false);
|
||||
reportingTask.setName(dto.getName());
|
||||
reportingTask.setComments(dto.getComments());
|
||||
reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod());
|
||||
reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy()));
|
||||
|
||||
final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), false);
|
||||
reportingTask.setName(dto.getName());
|
||||
reportingTask.setComments(dto.getComments());
|
||||
reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod());
|
||||
reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy()));
|
||||
reportingTask.setAnnotationData(dto.getAnnotationData());
|
||||
|
||||
reportingTask.setAnnotationData(dto.getAnnotationData());
|
||||
|
||||
for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
|
||||
if (entry.getValue() == null) {
|
||||
reportingTask.removeProperty(entry.getKey());
|
||||
} else {
|
||||
reportingTask.setProperty(entry.getKey(), entry.getValue());
|
||||
for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
|
||||
if (entry.getValue() == null) {
|
||||
reportingTask.removeProperty(entry.getKey());
|
||||
} else {
|
||||
reportingTask.setProperty(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask());
|
||||
final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(),
|
||||
SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller);
|
||||
|
||||
try {
|
||||
reportingTask.getReportingTask().initialize(config);
|
||||
} catch (final InitializationException ie) {
|
||||
throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + dto.getType(), ie);
|
||||
}
|
||||
|
||||
return reportingTask;
|
||||
} else {
|
||||
// otherwise return the existing reporting task node
|
||||
return controller.getReportingTaskNode(dto.getId());
|
||||
}
|
||||
}
|
||||
|
||||
final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask());
|
||||
final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(),
|
||||
SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller);
|
||||
|
||||
try {
|
||||
reportingTask.getReportingTask().initialize(config);
|
||||
} catch (final InitializationException ie) {
|
||||
throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + dto.getType(), ie);
|
||||
private void applyReportingTaskScheduleState(final FlowController controller, final ReportingTaskDTO dto, final ReportingTaskNode reportingTask,
|
||||
final boolean controllerInitialized, final boolean existingFlowEmpty) {
|
||||
if (!controllerInitialized || existingFlowEmpty) {
|
||||
applyNewReportingTaskScheduleState(controller, dto, reportingTask);
|
||||
} else {
|
||||
applyExistingReportingTaskScheduleState(controller, dto, reportingTask);
|
||||
}
|
||||
}
|
||||
|
||||
private void applyNewReportingTaskScheduleState(final FlowController controller, final ReportingTaskDTO dto, final ReportingTaskNode reportingTask) {
|
||||
if (autoResumeState) {
|
||||
if (ScheduledState.RUNNING.name().equals(dto.getState())) {
|
||||
try {
|
||||
|
@ -517,10 +596,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
|||
}
|
||||
}
|
||||
|
||||
private void updateReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) {
|
||||
final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor);
|
||||
final ReportingTaskNode taskNode = controller.getReportingTaskNode(dto.getId());
|
||||
|
||||
private void applyExistingReportingTaskScheduleState(final FlowController controller, final ReportingTaskDTO dto, final ReportingTaskNode taskNode) {
|
||||
if (!taskNode.getScheduledState().name().equals(dto.getState())) {
|
||||
try {
|
||||
switch (ScheduledState.valueOf(dto.getState())) {
|
||||
|
@ -863,7 +939,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
|||
// Add Controller Services
|
||||
final List<Element> serviceNodeList = getChildrenByTagName(processGroupElement, "controllerService");
|
||||
if (!serviceNodeList.isEmpty()) {
|
||||
ControllerServiceLoader.loadControllerServices(serviceNodeList, controller, processGroup, encryptor, controller.getBulletinRepository(), autoResumeState);
|
||||
final Map<ControllerServiceNode, Element> controllerServices = ControllerServiceLoader.loadControllerServices(serviceNodeList, controller, processGroup, encryptor);
|
||||
ControllerServiceLoader.enableControllerServices(controllerServices, controller, encryptor, autoResumeState);
|
||||
}
|
||||
|
||||
// add processors
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.nifi.controller.service;
|
|||
import java.io.BufferedInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
|
@ -26,11 +27,13 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import javax.xml.parsers.ParserConfigurationException;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.FlowController;
|
||||
import org.apache.nifi.controller.serialization.FlowFromDOMFactory;
|
||||
import org.apache.nifi.encrypt.StringEncryptor;
|
||||
|
@ -91,14 +94,17 @@ public class ControllerServiceLoader {
|
|||
final Document document = builder.parse(in);
|
||||
final Element controllerServices = document.getDocumentElement();
|
||||
final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServices, "controllerService");
|
||||
return new ArrayList<>(loadControllerServices(serviceElements, controller, parentGroup, encryptor, bulletinRepo, autoResumeState));
|
||||
|
||||
final Map<ControllerServiceNode, Element> controllerServiceMap = ControllerServiceLoader.loadControllerServices(serviceElements, controller, parentGroup, encryptor);
|
||||
enableControllerServices(controllerServiceMap, controller, encryptor, autoResumeState);
|
||||
return new ArrayList<>(controllerServiceMap.keySet());
|
||||
} catch (SAXException | ParserConfigurationException sxe) {
|
||||
throw new IOException(sxe);
|
||||
}
|
||||
}
|
||||
|
||||
public static Collection<ControllerServiceNode> loadControllerServices(final List<Element> serviceElements, final FlowController controller, final ProcessGroup parentGroup,
|
||||
final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) {
|
||||
public static Map<ControllerServiceNode, Element> loadControllerServices(final List<Element> serviceElements, final FlowController controller,
|
||||
final ProcessGroup parentGroup, final StringEncryptor encryptor) {
|
||||
|
||||
final Map<ControllerServiceNode, Element> nodeMap = new HashMap<>();
|
||||
for (final Element serviceElement : serviceElements) {
|
||||
|
@ -117,6 +123,11 @@ public class ControllerServiceLoader {
|
|||
configureControllerService(entry.getKey(), entry.getValue(), encryptor);
|
||||
}
|
||||
|
||||
return nodeMap;
|
||||
}
|
||||
|
||||
public static void enableControllerServices(final Map<ControllerServiceNode, Element> nodeMap, final FlowController controller,
|
||||
final StringEncryptor encryptor, final boolean autoResumeState) {
|
||||
// Start services
|
||||
if (autoResumeState) {
|
||||
final Set<ControllerServiceNode> nodesToEnable = new HashSet<>();
|
||||
|
@ -135,10 +146,34 @@ public class ControllerServiceLoader {
|
|||
}
|
||||
}
|
||||
|
||||
enableControllerServices(nodesToEnable, controller, autoResumeState);
|
||||
}
|
||||
}
|
||||
|
||||
public static void enableControllerServices(final Collection<ControllerServiceNode> nodesToEnable, final FlowController controller, final boolean autoResumeState) {
|
||||
// Start services
|
||||
if (autoResumeState) {
|
||||
controller.enableControllerServices(nodesToEnable);
|
||||
}
|
||||
}
|
||||
|
||||
return nodeMap.keySet();
|
||||
public static ControllerServiceNode cloneControllerService(final ControllerServiceProvider provider, final ControllerServiceNode controllerService) {
|
||||
// create a new id for the clone seeded from the original id so that it is consistent in a cluster
|
||||
final UUID id = UUID.nameUUIDFromBytes(controllerService.getIdentifier().getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
final ControllerServiceNode clone = provider.createControllerService(controllerService.getCanonicalClassName(), id.toString(), false);
|
||||
clone.setName(controllerService.getName());
|
||||
clone.setComments(controllerService.getComments());
|
||||
|
||||
if (controllerService.getProperties() != null) {
|
||||
for (Map.Entry<PropertyDescriptor, String> propEntry : controllerService.getProperties().entrySet()) {
|
||||
if (propEntry.getValue() != null) {
|
||||
clone.setProperty(propEntry.getKey().getName(), propEntry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return clone;
|
||||
}
|
||||
|
||||
private static ControllerServiceNode createControllerService(final ControllerServiceProvider provider, final Element controllerServiceElement, final StringEncryptor encryptor) {
|
||||
|
@ -162,4 +197,5 @@ public class ControllerServiceLoader {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.nifi.controller;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.nifi.admin.service.AuditService;
|
||||
import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
|
||||
import org.apache.nifi.authorization.AccessPolicy;
|
||||
|
@ -41,6 +42,9 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Set;
|
||||
|
@ -127,6 +131,104 @@ public class TestFlowController {
|
|||
controller.shutdown(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSynchronizeFlowWithReportingTaskAndProcessorReferencingControllerService() throws IOException {
|
||||
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
|
||||
final String authFingerprint = authorizer.getFingerprint();
|
||||
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
|
||||
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
final File flowFile = new File("src/test/resources/conf/reporting-task-with-cs-flow-0.7.0.xml");
|
||||
final String flow = IOUtils.toString(new FileInputStream(flowFile));
|
||||
when(proposedDataFlow.getFlow()).thenReturn(flow.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
controller.synchronize(standardFlowSynchronizer, proposedDataFlow);
|
||||
|
||||
// should be two controller services
|
||||
final Set<ControllerServiceNode> controllerServiceNodes = controller.getAllControllerServices();
|
||||
assertNotNull(controllerServiceNodes);
|
||||
assertEquals(2, controllerServiceNodes.size());
|
||||
|
||||
// find the controller service that was moved to the root group
|
||||
final ControllerServiceNode rootGroupCs = controllerServiceNodes.stream().filter(c -> c.getProcessGroup() != null).findFirst().get();
|
||||
assertNotNull(rootGroupCs);
|
||||
|
||||
// find the controller service that was not moved to the root group
|
||||
final ControllerServiceNode controllerCs = controllerServiceNodes.stream().filter(c -> c.getProcessGroup() == null).findFirst().get();
|
||||
assertNotNull(controllerCs);
|
||||
|
||||
// should be same class (not Ghost), different ids, and same properties
|
||||
assertEquals(rootGroupCs.getCanonicalClassName(), controllerCs.getCanonicalClassName());
|
||||
assertFalse(rootGroupCs.getCanonicalClassName().contains("Ghost"));
|
||||
assertNotEquals(rootGroupCs.getIdentifier(), controllerCs.getIdentifier());
|
||||
assertEquals(rootGroupCs.getProperties(), controllerCs.getProperties());
|
||||
|
||||
// should be one processor
|
||||
final Set<ProcessorNode> processorNodes = controller.getGroup(controller.getRootGroupId()).getProcessors();
|
||||
assertNotNull(processorNodes);
|
||||
assertEquals(1, processorNodes.size());
|
||||
|
||||
// verify the processor is still pointing at the controller service that got moved to the root group
|
||||
final ProcessorNode processorNode = processorNodes.stream().findFirst().get();
|
||||
final PropertyDescriptor procControllerServiceProp = processorNode.getProperties().entrySet().stream()
|
||||
.filter(e -> e.getValue().equals(rootGroupCs.getIdentifier()))
|
||||
.map(e -> e.getKey())
|
||||
.findFirst()
|
||||
.get();
|
||||
assertNotNull(procControllerServiceProp);
|
||||
|
||||
// should be one reporting task
|
||||
final Set<ReportingTaskNode> reportingTaskNodes = controller.getAllReportingTasks();
|
||||
assertNotNull(reportingTaskNodes);
|
||||
assertEquals(1, reportingTaskNodes.size());
|
||||
|
||||
// verify that the reporting task is pointing at the controller service at the controller level
|
||||
final ReportingTaskNode reportingTaskNode = reportingTaskNodes.stream().findFirst().get();
|
||||
final PropertyDescriptor reportingTaskControllerServiceProp = reportingTaskNode.getProperties().entrySet().stream()
|
||||
.filter(e -> e.getValue().equals(controllerCs.getIdentifier()))
|
||||
.map(e -> e.getKey())
|
||||
.findFirst()
|
||||
.get();
|
||||
assertNotNull(reportingTaskControllerServiceProp);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSynchronizeFlowWithProcessorReferencingControllerService() throws IOException {
|
||||
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
|
||||
final String authFingerprint = authorizer.getFingerprint();
|
||||
final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class);
|
||||
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
final File flowFile = new File("src/test/resources/conf/processor-with-cs-flow-0.7.0.xml");
|
||||
final String flow = IOUtils.toString(new FileInputStream(flowFile));
|
||||
when(proposedDataFlow.getFlow()).thenReturn(flow.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
controller.synchronize(standardFlowSynchronizer, proposedDataFlow);
|
||||
|
||||
// should be two controller services
|
||||
final Set<ControllerServiceNode> controllerServiceNodes = controller.getAllControllerServices();
|
||||
assertNotNull(controllerServiceNodes);
|
||||
assertEquals(1, controllerServiceNodes.size());
|
||||
|
||||
// find the controller service that was moved to the root group
|
||||
final ControllerServiceNode rootGroupCs = controllerServiceNodes.stream().filter(c -> c.getProcessGroup() != null).findFirst().get();
|
||||
assertNotNull(rootGroupCs);
|
||||
|
||||
// should be one processor
|
||||
final Set<ProcessorNode> processorNodes = controller.getGroup(controller.getRootGroupId()).getProcessors();
|
||||
assertNotNull(processorNodes);
|
||||
assertEquals(1, processorNodes.size());
|
||||
|
||||
// verify the processor is still pointing at the controller service that got moved to the root group
|
||||
final ProcessorNode processorNode = processorNodes.stream().findFirst().get();
|
||||
final PropertyDescriptor procControllerServiceProp = processorNode.getProperties().entrySet().stream()
|
||||
.filter(e -> e.getValue().equals(rootGroupCs.getIdentifier()))
|
||||
.map(e -> e.getKey())
|
||||
.findFirst()
|
||||
.get();
|
||||
assertNotNull(procControllerServiceProp);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSynchronizeFlowWhenAuthorizationsAreEqual() {
|
||||
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.controller.service.mock;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.apache.nifi.reporting.AbstractReportingTask;
|
||||
import org.apache.nifi.reporting.ReportingContext;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class DummyReportingTask extends AbstractReportingTask {
|
||||
|
||||
public static final PropertyDescriptor SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("Controller Service")
|
||||
.identifiesControllerService(ControllerService.class)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_FOO = new PropertyDescriptor.Builder()
|
||||
.name("Foo")
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(SERVICE);
|
||||
descriptors.add(PROP_FOO);
|
||||
return descriptors;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(ReportingContext context) {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.controller.service.mock;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class ServiceD extends AbstractControllerService {
|
||||
|
||||
public static final PropertyDescriptor PROP_FOO1 = new PropertyDescriptor.Builder()
|
||||
.name("Foo1")
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_FOO2 = new PropertyDescriptor.Builder()
|
||||
.name("Foo2")
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(PROP_FOO1);
|
||||
descriptors.add(PROP_FOO2);
|
||||
return descriptors;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<flowController>
|
||||
<maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
|
||||
<maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
|
||||
<rootGroup>
|
||||
<id>778f676e-6542-4c18-9d06-24b6fd3a1b29</id>
|
||||
<name>NiFi Flow</name>
|
||||
<position x="0.0" y="0.0"/>
|
||||
<comment/>
|
||||
<processor>
|
||||
<id>809cca74-cd11-4ffa-9831-39d446a8ed55</id>
|
||||
<name>DummyProcessor</name>
|
||||
<position x="670.0" y="235.0"/>
|
||||
<styles/>
|
||||
<comment/>
|
||||
<class>org.apache.nifi.controller.service.mock.DummyProcessor</class>
|
||||
<maxConcurrentTasks>1</maxConcurrentTasks>
|
||||
<schedulingPeriod>0 sec</schedulingPeriod>
|
||||
<penalizationPeriod>30 sec</penalizationPeriod>
|
||||
<yieldPeriod>1 sec</yieldPeriod>
|
||||
<bulletinLevel>WARN</bulletinLevel>
|
||||
<lossTolerant>false</lossTolerant>
|
||||
<scheduledState>STOPPED</scheduledState>
|
||||
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
|
||||
<runDurationNanos>0</runDurationNanos>
|
||||
<property>
|
||||
<name>Controller Service</name>
|
||||
<value>edf22ee5-376a-46dc-a38a-919351124457</value>
|
||||
</property>
|
||||
</processor>
|
||||
</rootGroup>
|
||||
<controllerServices>
|
||||
<controllerService>
|
||||
<id>edf22ee5-376a-46dc-a38a-919351124457</id>
|
||||
<name>ControllerService</name>
|
||||
<comment/>
|
||||
<class>org.apache.nifi.controller.service.mock.ServiceD</class>
|
||||
<enabled>false</enabled>
|
||||
<property>
|
||||
<name>Foo1</name>
|
||||
<value>Bar1</value>
|
||||
</property>
|
||||
</controllerService>
|
||||
</controllerServices>
|
||||
</flowController>
|
|
@ -0,0 +1,74 @@
|
|||
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<flowController>
|
||||
<maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
|
||||
<maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
|
||||
<rootGroup>
|
||||
<id>778f676e-6542-4c18-9d06-24b6fd3a1b29</id>
|
||||
<name>NiFi Flow</name>
|
||||
<position x="0.0" y="0.0"/>
|
||||
<comment/>
|
||||
<processor>
|
||||
<id>809cca74-cd11-4ffa-9831-39d446a8ed54</id>
|
||||
<name>DummyProcessor</name>
|
||||
<position x="670.0" y="235.0"/>
|
||||
<styles/>
|
||||
<comment/>
|
||||
<class>org.apache.nifi.controller.service.mock.DummyProcessor</class>
|
||||
<maxConcurrentTasks>1</maxConcurrentTasks>
|
||||
<schedulingPeriod>0 sec</schedulingPeriod>
|
||||
<penalizationPeriod>30 sec</penalizationPeriod>
|
||||
<yieldPeriod>1 sec</yieldPeriod>
|
||||
<bulletinLevel>WARN</bulletinLevel>
|
||||
<lossTolerant>false</lossTolerant>
|
||||
<scheduledState>STOPPED</scheduledState>
|
||||
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
|
||||
<runDurationNanos>0</runDurationNanos>
|
||||
<property>
|
||||
<name>Controller Service</name>
|
||||
<value>edf22ee5-376a-46dc-a38a-919351124456</value>
|
||||
</property>
|
||||
</processor>
|
||||
</rootGroup>
|
||||
<controllerServices>
|
||||
<controllerService>
|
||||
<id>edf22ee5-376a-46dc-a38a-919351124456</id>
|
||||
<name>ControllerService</name>
|
||||
<comment/>
|
||||
<class>org.apache.nifi.controller.service.mock.ServiceD</class>
|
||||
<enabled>false</enabled>
|
||||
<property>
|
||||
<name>Foo1</name>
|
||||
<value>Bar1</value>
|
||||
</property>
|
||||
</controllerService>
|
||||
</controllerServices>
|
||||
<reportingTasks>
|
||||
<reportingTask>
|
||||
<id>fb9b40ce-608f-4a2f-9822-3899f695f699</id>
|
||||
<name>ReportingTask</name>
|
||||
<comment/>
|
||||
<class>org.apache.nifi.controller.service.mock.DummyReportingTask</class>
|
||||
<schedulingPeriod>5 mins</schedulingPeriod>
|
||||
<scheduledState>STOPPED</scheduledState>
|
||||
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
|
||||
<property>
|
||||
<name>Controller Service</name>
|
||||
<value>edf22ee5-376a-46dc-a38a-919351124456</value>
|
||||
</property>
|
||||
</reportingTask>
|
||||
</reportingTasks>
|
||||
</flowController>
|
Loading…
Reference in New Issue