diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 5b2509df41..0e0c74b7d3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -21,6 +21,7 @@ import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer; import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.cluster.protocol.StandardDataFlow; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; @@ -103,11 +104,13 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; /** @@ -298,6 +301,21 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } + // get all the reporting task elements + final Element reportingTasksElement = DomUtils.getChild(rootElement, "reportingTasks"); + final List reportingTaskElements = new ArrayList<>(); + if (reportingTasksElement != null) { + reportingTaskElements.addAll(DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask")); + } + + // get/create all the reporting task nodes and DTOs, but don't apply their scheduled state yet + final Map reportingTaskNodesToDTOs = new HashMap<>(); + for (final Element taskElement : reportingTaskElements) { + final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(taskElement, encryptor); + final ReportingTaskNode reportingTask = getOrCreateReportingTask(controller, dto, initialized, existingFlowEmpty); + reportingTaskNodesToDTOs.put(reportingTask, dto); + } + final Element controllerServicesElement = DomUtils.getChild(rootElement, "controllerServices"); if (controllerServicesElement != null) { final List serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService"); @@ -308,7 +326,40 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // to the root Group. Otherwise, we want to use a null group, which indicates a Controller-level // Controller Service. final ProcessGroup group = (encodingVersion == null) ? rootGroup : null; - ControllerServiceLoader.loadControllerServices(serviceElements, controller, group, encryptor, controller.getBulletinRepository(), autoResumeState); + final Map controllerServices = ControllerServiceLoader.loadControllerServices(serviceElements, controller, group, encryptor); + + // If we are moving controller services to the root group we also need to see if any reporting tasks + // reference them, and if so we need to clone the CS and update the reporting task reference + if (group != null) { + // find all the controller service ids referenced by reporting tasks + final Set controllerServicesInReportingTasks = reportingTaskNodesToDTOs.keySet().stream() + .flatMap(r -> r.getProperties().entrySet().stream()) + .filter(e -> e.getKey().getControllerServiceDefinition() != null) + .map(e -> e.getValue()) + .collect(Collectors.toSet()); + + // find the controller service nodes for each id referenced by a reporting task + final Set controllerServicesToClone = controllerServices.keySet().stream() + .filter(cs -> controllerServicesInReportingTasks.contains(cs.getIdentifier())) + .collect(Collectors.toSet()); + + // clone the controller services and map the original id to the clone + final Map controllerServiceMapping = new HashMap<>(); + for (ControllerServiceNode controllerService : controllerServicesToClone) { + final ControllerServiceNode clone = ControllerServiceLoader.cloneControllerService(controller, controllerService); + controller.addRootControllerService(clone); + controllerServiceMapping.put(controllerService.getIdentifier(), clone); + } + + // update the reporting tasks to reference the cloned controller services + updateReportingTaskControllerServices(reportingTaskNodesToDTOs.keySet(), controllerServiceMapping); + + // enable all the cloned controller services + ControllerServiceLoader.enableControllerServices(controllerServiceMapping.values(), controller, autoResumeState); + } + + // enable all the original controller services + ControllerServiceLoader.enableControllerServices(controllerServices, controller, encryptor, autoResumeState); } else { for (final Element serviceElement : serviceElements) { updateControllerService(controller, serviceElement, encryptor); @@ -318,16 +369,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { scaleRootGroup(rootGroup, encodingVersion); - final Element reportingTasksElement = DomUtils.getChild(rootElement, "reportingTasks"); - if (reportingTasksElement != null) { - final List taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask"); - for (final Element taskElement : taskElements) { - if (!initialized || existingFlowEmpty) { - addReportingTask(controller, taskElement, encryptor); - } else { - updateReportingTask(controller, taskElement, encryptor); - } - } + // now that controller services are loaded and enabled we can apply the scheduled state to each reporting task + for (Map.Entry entry : reportingTaskNodesToDTOs.entrySet()) { + applyReportingTaskScheduleState(controller, entry.getValue(), entry.getKey(), initialized, existingFlowEmpty); } } } @@ -359,6 +403,23 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } + private void updateReportingTaskControllerServices(final Set reportingTasks, final Map controllerServiceMapping) { + for (ReportingTaskNode reportingTask : reportingTasks) { + if (reportingTask.getProperties() != null) { + final Set> propertyDescriptors = reportingTask.getProperties().entrySet().stream() + .filter(e -> e.getKey().getControllerServiceDefinition() != null) + .filter(e -> controllerServiceMapping.containsKey(e.getValue())) + .collect(Collectors.toSet()); + + for (Map.Entry propEntry : propertyDescriptors) { + final PropertyDescriptor propertyDescriptor = propEntry.getKey(); + final ControllerServiceNode clone = controllerServiceMapping.get(propEntry.getValue()); + reportingTask.setProperty(propertyDescriptor.getName(), clone.getIdentifier()); + } + } + } + } + private void addLocalTemplates(final Element processGroupElement, final ProcessGroup processGroup, final FlowEncodingVersion encodingVersion) { // Replace the templates with those from the proposed flow final List templateNodeList = getChildrenByTagName(processGroupElement, "template"); @@ -461,35 +522,53 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } - private void addReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) throws ReportingTaskInstantiationException { - final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor); + private ReportingTaskNode getOrCreateReportingTask(final FlowController controller, final ReportingTaskDTO dto, final boolean controllerInitialized, final boolean existingFlowEmpty) + throws ReportingTaskInstantiationException { + // create a new reporting task node when the controller is not initialized or the flow is empty + if (!controllerInitialized || existingFlowEmpty) { + final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), false); + reportingTask.setName(dto.getName()); + reportingTask.setComments(dto.getComments()); + reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod()); + reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy())); - final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), false); - reportingTask.setName(dto.getName()); - reportingTask.setComments(dto.getComments()); - reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod()); - reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy())); + reportingTask.setAnnotationData(dto.getAnnotationData()); - reportingTask.setAnnotationData(dto.getAnnotationData()); - - for (final Map.Entry entry : dto.getProperties().entrySet()) { - if (entry.getValue() == null) { - reportingTask.removeProperty(entry.getKey()); - } else { - reportingTask.setProperty(entry.getKey(), entry.getValue()); + for (final Map.Entry entry : dto.getProperties().entrySet()) { + if (entry.getValue() == null) { + reportingTask.removeProperty(entry.getKey()); + } else { + reportingTask.setProperty(entry.getKey(), entry.getValue()); + } } + + final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask()); + final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(), + SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller); + + try { + reportingTask.getReportingTask().initialize(config); + } catch (final InitializationException ie) { + throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + dto.getType(), ie); + } + + return reportingTask; + } else { + // otherwise return the existing reporting task node + return controller.getReportingTaskNode(dto.getId()); } + } - final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask()); - final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(), - SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller); - - try { - reportingTask.getReportingTask().initialize(config); - } catch (final InitializationException ie) { - throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + dto.getType(), ie); + private void applyReportingTaskScheduleState(final FlowController controller, final ReportingTaskDTO dto, final ReportingTaskNode reportingTask, + final boolean controllerInitialized, final boolean existingFlowEmpty) { + if (!controllerInitialized || existingFlowEmpty) { + applyNewReportingTaskScheduleState(controller, dto, reportingTask); + } else { + applyExistingReportingTaskScheduleState(controller, dto, reportingTask); } + } + private void applyNewReportingTaskScheduleState(final FlowController controller, final ReportingTaskDTO dto, final ReportingTaskNode reportingTask) { if (autoResumeState) { if (ScheduledState.RUNNING.name().equals(dto.getState())) { try { @@ -517,10 +596,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } - private void updateReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) { - final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor); - final ReportingTaskNode taskNode = controller.getReportingTaskNode(dto.getId()); - + private void applyExistingReportingTaskScheduleState(final FlowController controller, final ReportingTaskDTO dto, final ReportingTaskNode taskNode) { if (!taskNode.getScheduledState().name().equals(dto.getState())) { try { switch (ScheduledState.valueOf(dto.getState())) { @@ -863,7 +939,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // Add Controller Services final List serviceNodeList = getChildrenByTagName(processGroupElement, "controllerService"); if (!serviceNodeList.isEmpty()) { - ControllerServiceLoader.loadControllerServices(serviceNodeList, controller, processGroup, encryptor, controller.getBulletinRepository(), autoResumeState); + final Map controllerServices = ControllerServiceLoader.loadControllerServices(serviceNodeList, controller, processGroup, encryptor); + ControllerServiceLoader.enableControllerServices(controllerServices, controller, encryptor, autoResumeState); } // add processors diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java index d302cff7fa..8b3dcf4b7e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java @@ -19,6 +19,7 @@ package org.apache.nifi.controller.service; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -26,11 +27,13 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.serialization.FlowFromDOMFactory; import org.apache.nifi.encrypt.StringEncryptor; @@ -91,14 +94,17 @@ public class ControllerServiceLoader { final Document document = builder.parse(in); final Element controllerServices = document.getDocumentElement(); final List serviceElements = DomUtils.getChildElementsByTagName(controllerServices, "controllerService"); - return new ArrayList<>(loadControllerServices(serviceElements, controller, parentGroup, encryptor, bulletinRepo, autoResumeState)); + + final Map controllerServiceMap = ControllerServiceLoader.loadControllerServices(serviceElements, controller, parentGroup, encryptor); + enableControllerServices(controllerServiceMap, controller, encryptor, autoResumeState); + return new ArrayList<>(controllerServiceMap.keySet()); } catch (SAXException | ParserConfigurationException sxe) { throw new IOException(sxe); } } - public static Collection loadControllerServices(final List serviceElements, final FlowController controller, final ProcessGroup parentGroup, - final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) { + public static Map loadControllerServices(final List serviceElements, final FlowController controller, + final ProcessGroup parentGroup, final StringEncryptor encryptor) { final Map nodeMap = new HashMap<>(); for (final Element serviceElement : serviceElements) { @@ -117,6 +123,11 @@ public class ControllerServiceLoader { configureControllerService(entry.getKey(), entry.getValue(), encryptor); } + return nodeMap; + } + + public static void enableControllerServices(final Map nodeMap, final FlowController controller, + final StringEncryptor encryptor, final boolean autoResumeState) { // Start services if (autoResumeState) { final Set nodesToEnable = new HashSet<>(); @@ -135,10 +146,34 @@ public class ControllerServiceLoader { } } + enableControllerServices(nodesToEnable, controller, autoResumeState); + } + } + + public static void enableControllerServices(final Collection nodesToEnable, final FlowController controller, final boolean autoResumeState) { + // Start services + if (autoResumeState) { controller.enableControllerServices(nodesToEnable); } + } - return nodeMap.keySet(); + public static ControllerServiceNode cloneControllerService(final ControllerServiceProvider provider, final ControllerServiceNode controllerService) { + // create a new id for the clone seeded from the original id so that it is consistent in a cluster + final UUID id = UUID.nameUUIDFromBytes(controllerService.getIdentifier().getBytes(StandardCharsets.UTF_8)); + + final ControllerServiceNode clone = provider.createControllerService(controllerService.getCanonicalClassName(), id.toString(), false); + clone.setName(controllerService.getName()); + clone.setComments(controllerService.getComments()); + + if (controllerService.getProperties() != null) { + for (Map.Entry propEntry : controllerService.getProperties().entrySet()) { + if (propEntry.getValue() != null) { + clone.setProperty(propEntry.getKey().getName(), propEntry.getValue()); + } + } + } + + return clone; } private static ControllerServiceNode createControllerService(final ControllerServiceProvider provider, final Element controllerServiceElement, final StringEncryptor encryptor) { @@ -162,4 +197,5 @@ public class ControllerServiceLoader { } } } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java index 60530c8c8b..71af93f10b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller; +import org.apache.commons.io.IOUtils; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer; import org.apache.nifi.authorization.AccessPolicy; @@ -41,6 +42,9 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.LinkedHashSet; import java.util.Set; @@ -127,6 +131,104 @@ public class TestFlowController { controller.shutdown(true); } + @Test + public void testSynchronizeFlowWithReportingTaskAndProcessorReferencingControllerService() throws IOException { + // create a mock proposed data flow with the same auth fingerprint as the current authorizer + final String authFingerprint = authorizer.getFingerprint(); + final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class); + when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8)); + + final File flowFile = new File("src/test/resources/conf/reporting-task-with-cs-flow-0.7.0.xml"); + final String flow = IOUtils.toString(new FileInputStream(flowFile)); + when(proposedDataFlow.getFlow()).thenReturn(flow.getBytes(StandardCharsets.UTF_8)); + + controller.synchronize(standardFlowSynchronizer, proposedDataFlow); + + // should be two controller services + final Set controllerServiceNodes = controller.getAllControllerServices(); + assertNotNull(controllerServiceNodes); + assertEquals(2, controllerServiceNodes.size()); + + // find the controller service that was moved to the root group + final ControllerServiceNode rootGroupCs = controllerServiceNodes.stream().filter(c -> c.getProcessGroup() != null).findFirst().get(); + assertNotNull(rootGroupCs); + + // find the controller service that was not moved to the root group + final ControllerServiceNode controllerCs = controllerServiceNodes.stream().filter(c -> c.getProcessGroup() == null).findFirst().get(); + assertNotNull(controllerCs); + + // should be same class (not Ghost), different ids, and same properties + assertEquals(rootGroupCs.getCanonicalClassName(), controllerCs.getCanonicalClassName()); + assertFalse(rootGroupCs.getCanonicalClassName().contains("Ghost")); + assertNotEquals(rootGroupCs.getIdentifier(), controllerCs.getIdentifier()); + assertEquals(rootGroupCs.getProperties(), controllerCs.getProperties()); + + // should be one processor + final Set processorNodes = controller.getGroup(controller.getRootGroupId()).getProcessors(); + assertNotNull(processorNodes); + assertEquals(1, processorNodes.size()); + + // verify the processor is still pointing at the controller service that got moved to the root group + final ProcessorNode processorNode = processorNodes.stream().findFirst().get(); + final PropertyDescriptor procControllerServiceProp = processorNode.getProperties().entrySet().stream() + .filter(e -> e.getValue().equals(rootGroupCs.getIdentifier())) + .map(e -> e.getKey()) + .findFirst() + .get(); + assertNotNull(procControllerServiceProp); + + // should be one reporting task + final Set reportingTaskNodes = controller.getAllReportingTasks(); + assertNotNull(reportingTaskNodes); + assertEquals(1, reportingTaskNodes.size()); + + // verify that the reporting task is pointing at the controller service at the controller level + final ReportingTaskNode reportingTaskNode = reportingTaskNodes.stream().findFirst().get(); + final PropertyDescriptor reportingTaskControllerServiceProp = reportingTaskNode.getProperties().entrySet().stream() + .filter(e -> e.getValue().equals(controllerCs.getIdentifier())) + .map(e -> e.getKey()) + .findFirst() + .get(); + assertNotNull(reportingTaskControllerServiceProp); + } + + @Test + public void testSynchronizeFlowWithProcessorReferencingControllerService() throws IOException { + // create a mock proposed data flow with the same auth fingerprint as the current authorizer + final String authFingerprint = authorizer.getFingerprint(); + final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class); + when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8)); + + final File flowFile = new File("src/test/resources/conf/processor-with-cs-flow-0.7.0.xml"); + final String flow = IOUtils.toString(new FileInputStream(flowFile)); + when(proposedDataFlow.getFlow()).thenReturn(flow.getBytes(StandardCharsets.UTF_8)); + + controller.synchronize(standardFlowSynchronizer, proposedDataFlow); + + // should be two controller services + final Set controllerServiceNodes = controller.getAllControllerServices(); + assertNotNull(controllerServiceNodes); + assertEquals(1, controllerServiceNodes.size()); + + // find the controller service that was moved to the root group + final ControllerServiceNode rootGroupCs = controllerServiceNodes.stream().filter(c -> c.getProcessGroup() != null).findFirst().get(); + assertNotNull(rootGroupCs); + + // should be one processor + final Set processorNodes = controller.getGroup(controller.getRootGroupId()).getProcessors(); + assertNotNull(processorNodes); + assertEquals(1, processorNodes.size()); + + // verify the processor is still pointing at the controller service that got moved to the root group + final ProcessorNode processorNode = processorNodes.stream().findFirst().get(); + final PropertyDescriptor procControllerServiceProp = processorNode.getProperties().entrySet().stream() + .filter(e -> e.getValue().equals(rootGroupCs.getIdentifier())) + .map(e -> e.getKey()) + .findFirst() + .get(); + assertNotNull(procControllerServiceProp); + } + @Test public void testSynchronizeFlowWhenAuthorizationsAreEqual() { // create a mock proposed data flow with the same auth fingerprint as the current authorizer diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyReportingTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyReportingTask.java new file mode 100644 index 0000000000..cb74c76d89 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyReportingTask.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.service.mock; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.ReportingContext; + +import java.util.ArrayList; +import java.util.List; + +public class DummyReportingTask extends AbstractReportingTask { + + public static final PropertyDescriptor SERVICE = new PropertyDescriptor.Builder() + .name("Controller Service") + .identifiesControllerService(ControllerService.class) + .required(true) + .build(); + + public static final PropertyDescriptor PROP_FOO = new PropertyDescriptor.Builder() + .name("Foo") + .required(false) + .build(); + + @Override + protected List getSupportedPropertyDescriptors() { + final List descriptors = new ArrayList<>(); + descriptors.add(SERVICE); + descriptors.add(PROP_FOO); + return descriptors; + } + + @Override + public void onTrigger(ReportingContext context) { + + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceD.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceD.java new file mode 100644 index 0000000000..b61beadd49 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceD.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.service.mock; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; + +import java.util.ArrayList; +import java.util.List; + +public class ServiceD extends AbstractControllerService { + + public static final PropertyDescriptor PROP_FOO1 = new PropertyDescriptor.Builder() + .name("Foo1") + .required(false) + .build(); + + public static final PropertyDescriptor PROP_FOO2 = new PropertyDescriptor.Builder() + .name("Foo2") + .required(false) + .build(); + + @Override + protected List getSupportedPropertyDescriptors() { + final List descriptors = new ArrayList<>(); + descriptors.add(PROP_FOO1); + descriptors.add(PROP_FOO2); + return descriptors; + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/processor-with-cs-flow-0.7.0.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/processor-with-cs-flow-0.7.0.xml new file mode 100644 index 0000000000..7703a0b272 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/processor-with-cs-flow-0.7.0.xml @@ -0,0 +1,59 @@ + + + + 10 + 5 + + 778f676e-6542-4c18-9d06-24b6fd3a1b29 + NiFi Flow + + + + 809cca74-cd11-4ffa-9831-39d446a8ed55 + DummyProcessor + + + + org.apache.nifi.controller.service.mock.DummyProcessor + 1 + 0 sec + 30 sec + 1 sec + WARN + false + STOPPED + TIMER_DRIVEN + 0 + + Controller Service + edf22ee5-376a-46dc-a38a-919351124457 + + + + + + edf22ee5-376a-46dc-a38a-919351124457 + ControllerService + + org.apache.nifi.controller.service.mock.ServiceD + false + + Foo1 + Bar1 + + + + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/reporting-task-with-cs-flow-0.7.0.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/reporting-task-with-cs-flow-0.7.0.xml new file mode 100644 index 0000000000..b9cff579fb --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/conf/reporting-task-with-cs-flow-0.7.0.xml @@ -0,0 +1,74 @@ + + + + 10 + 5 + + 778f676e-6542-4c18-9d06-24b6fd3a1b29 + NiFi Flow + + + + 809cca74-cd11-4ffa-9831-39d446a8ed54 + DummyProcessor + + + + org.apache.nifi.controller.service.mock.DummyProcessor + 1 + 0 sec + 30 sec + 1 sec + WARN + false + STOPPED + TIMER_DRIVEN + 0 + + Controller Service + edf22ee5-376a-46dc-a38a-919351124456 + + + + + + edf22ee5-376a-46dc-a38a-919351124456 + ControllerService + + org.apache.nifi.controller.service.mock.ServiceD + false + + Foo1 + Bar1 + + + + + + fb9b40ce-608f-4a2f-9822-3899f695f699 + ReportingTask + + org.apache.nifi.controller.service.mock.DummyReportingTask + 5 mins + STOPPED + TIMER_DRIVEN + + Controller Service + edf22ee5-376a-46dc-a38a-919351124456 + + + +