From a73b5bda4233fd28f3ddeca789ec6150a4e9878f Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Mon, 30 Oct 2017 10:19:07 +0900 Subject: [PATCH] NIFI-4546: Make ReportingTask aware of node type in a cluster Signed-off-by: Pierre Villard This closes #2235. --- .../nifi/reporting/AbstractReportingTask.java | 14 ++++++++++++-- .../reporting/ReportingInitializationContext.java | 9 +++++++++ .../util/MockReportingInitializationContext.java | 6 ++++++ .../org/apache/nifi/controller/FlowController.java | 2 +- .../nifi/controller/StandardFlowSynchronizer.java | 2 +- .../StandardReportingInitializationContext.java | 11 ++++++++++- .../scheduling/TestStandardProcessScheduler.java | 2 +- .../mock/MockReportingInitializationContext.java | 6 ++++++ 8 files changed, 46 insertions(+), 6 deletions(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/AbstractReportingTask.java b/nifi-api/src/main/java/org/apache/nifi/reporting/AbstractReportingTask.java index b5afe17a9f..93d23b22f8 100644 --- a/nifi-api/src/main/java/org/apache/nifi/reporting/AbstractReportingTask.java +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/AbstractReportingTask.java @@ -20,8 +20,8 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.components.AbstractConfigurableComponent; import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.NodeTypeProvider; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessorInitializationContext; public abstract class AbstractReportingTask extends AbstractConfigurableComponent implements ReportingTask { @@ -30,6 +30,7 @@ public abstract class AbstractReportingTask extends AbstractConfigurableComponen private long schedulingNanos; private ControllerServiceLookup serviceLookup; private ComponentLog logger; + private NodeTypeProvider nodeTypeProvider; @Override public final void initialize(final ReportingInitializationContext config) throws InitializationException { @@ -38,18 +39,27 @@ public abstract class AbstractReportingTask extends AbstractConfigurableComponen name = config.getName(); schedulingNanos = config.getSchedulingPeriod(TimeUnit.NANOSECONDS); serviceLookup = config.getControllerServiceLookup(); + nodeTypeProvider = config.getNodeTypeProvider(); init(config); } /** * @return the {@link ControllerServiceLookup} that was passed to the - * {@link #init(ProcessorInitializationContext)} method + * {@link #initialize(ReportingInitializationContext)} method */ protected final ControllerServiceLookup getControllerServiceLookup() { return serviceLookup; } + /** + * @return the {@link NodeTypeProvider} that was passed to the + * {@link #initialize(ReportingInitializationContext)} method + */ + protected final NodeTypeProvider getNodeTypeProvider() { + return nodeTypeProvider; + } + /** * @return the identifier of this Reporting Task */ diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java index df64e03493..0bf49d3976 100644 --- a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingInitializationContext.java @@ -19,6 +19,7 @@ package org.apache.nifi.reporting; import java.util.concurrent.TimeUnit; import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.NodeTypeProvider; import org.apache.nifi.kerberos.KerberosContext; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -75,4 +76,12 @@ public interface ReportingInitializationContext extends KerberosContext { * way and generate bulletins when appropriate */ ComponentLog getLogger(); + + /** + * @return the {@link NodeTypeProvider} which can be used to detect the node + * type of this NiFi instance. + * @since Apache NiFi 1.5.0 + */ + NodeTypeProvider getNodeTypeProvider(); + } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java index 454b742936..d1b8e5c84b 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingInitializationContext.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.NodeTypeProvider; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -73,6 +74,11 @@ public class MockReportingInitializationContext extends MockControllerServiceLoo return this; } + @Override + public NodeTypeProvider getNodeTypeProvider() { + return null; + } + @Override public String getSchedulingPeriod() { return "0 sec"; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index acc3102aef..99d8ed0801 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -3150,7 +3150,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R if (firstTimeAdded) { final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(), - SchedulingStrategy.TIMER_DRIVEN, "1 min", taskNode.getLogger(), this, nifiProperties); + SchedulingStrategy.TIMER_DRIVEN, "1 min", taskNode.getLogger(), this, nifiProperties, this); try { taskNode.getReportingTask().initialize(config); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 3af270cc50..3d07456e1f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -620,7 +620,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask()); final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(), - SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller, nifiProperties); + SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller, nifiProperties, controller); try { reportingTask.getReportingTask().initialize(config); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java index d96d2b7000..ebe774bdcb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.NodeTypeProvider; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.reporting.ReportingInitializationContext; @@ -38,11 +39,13 @@ public class StandardReportingInitializationContext implements ReportingInitiali private final ControllerServiceProvider serviceProvider; private final ComponentLog logger; private final NiFiProperties nifiProperties; + private final NodeTypeProvider nodeTypeProvider; public StandardReportingInitializationContext( final String id, final String name, final SchedulingStrategy schedulingStrategy, final String schedulingPeriod, final ComponentLog logger, - final ControllerServiceProvider serviceProvider, final NiFiProperties nifiProperties) { + final ControllerServiceProvider serviceProvider, final NiFiProperties nifiProperties, + final NodeTypeProvider nodeTypeProvider) { this.id = id; this.name = name; this.schedulingPeriod = schedulingPeriod; @@ -50,6 +53,7 @@ public class StandardReportingInitializationContext implements ReportingInitiali this.schedulingStrategy = schedulingStrategy; this.logger = logger; this.nifiProperties = nifiProperties; + this.nodeTypeProvider = nodeTypeProvider; } @Override @@ -134,4 +138,9 @@ public class StandardReportingInitializationContext implements ReportingInitiali public File getKerberosConfigurationFile() { return nifiProperties.getKerberosConfigurationFile(); } + + @Override + public NodeTypeProvider getNodeTypeProvider() { + return nodeTypeProvider; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index 2c59964238..0c4acd80a2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -116,7 +116,7 @@ public class TestStandardProcessScheduler { reportingTask = new TestReportingTask(); final ReportingInitializationContext config = new StandardReportingInitializationContext(UUID.randomUUID().toString(), "Test", SchedulingStrategy.TIMER_DRIVEN, "5 secs", - Mockito.mock(ComponentLog.class), null, nifiProperties); + Mockito.mock(ComponentLog.class), null, nifiProperties, null); reportingTask.initialize(config); final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null, variableRegistry); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockReportingInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockReportingInitializationContext.java index 630c657824..379a56e918 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockReportingInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockReportingInitializationContext.java @@ -17,6 +17,7 @@ package org.apache.nifi.mock; import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.NodeTypeProvider; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -80,4 +81,9 @@ public class MockReportingInitializationContext implements ReportingInitializati public File getKerberosConfigurationFile() { return null; } + + @Override + public NodeTypeProvider getNodeTypeProvider() { + return null; + } }