NIFI-4546: Make ReportingTask aware of node type in a cluster

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2235.
This commit is contained in:
Koji Kawamura 2017-10-30 10:19:07 +09:00 committed by Pierre Villard
parent d914ad2924
commit a73b5bda42
8 changed files with 46 additions and 6 deletions

View File

@ -20,8 +20,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.nifi.components.AbstractConfigurableComponent; import org.apache.nifi.components.AbstractConfigurableComponent;
import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessorInitializationContext;
public abstract class AbstractReportingTask extends AbstractConfigurableComponent implements ReportingTask { public abstract class AbstractReportingTask extends AbstractConfigurableComponent implements ReportingTask {
@ -30,6 +30,7 @@ public abstract class AbstractReportingTask extends AbstractConfigurableComponen
private long schedulingNanos; private long schedulingNanos;
private ControllerServiceLookup serviceLookup; private ControllerServiceLookup serviceLookup;
private ComponentLog logger; private ComponentLog logger;
private NodeTypeProvider nodeTypeProvider;
@Override @Override
public final void initialize(final ReportingInitializationContext config) throws InitializationException { public final void initialize(final ReportingInitializationContext config) throws InitializationException {
@ -38,18 +39,27 @@ public abstract class AbstractReportingTask extends AbstractConfigurableComponen
name = config.getName(); name = config.getName();
schedulingNanos = config.getSchedulingPeriod(TimeUnit.NANOSECONDS); schedulingNanos = config.getSchedulingPeriod(TimeUnit.NANOSECONDS);
serviceLookup = config.getControllerServiceLookup(); serviceLookup = config.getControllerServiceLookup();
nodeTypeProvider = config.getNodeTypeProvider();
init(config); init(config);
} }
/** /**
* @return the {@link ControllerServiceLookup} that was passed to the * @return the {@link ControllerServiceLookup} that was passed to the
* {@link #init(ProcessorInitializationContext)} method * {@link #initialize(ReportingInitializationContext)} method
*/ */
protected final ControllerServiceLookup getControllerServiceLookup() { protected final ControllerServiceLookup getControllerServiceLookup() {
return serviceLookup; return serviceLookup;
} }
/**
* @return the {@link NodeTypeProvider} that was passed to the
* {@link #initialize(ReportingInitializationContext)} method
*/
protected final NodeTypeProvider getNodeTypeProvider() {
return nodeTypeProvider;
}
/** /**
* @return the identifier of this Reporting Task * @return the identifier of this Reporting Task
*/ */

View File

@ -19,6 +19,7 @@ package org.apache.nifi.reporting;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.kerberos.KerberosContext; import org.apache.nifi.kerberos.KerberosContext;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.SchedulingStrategy;
@ -75,4 +76,12 @@ public interface ReportingInitializationContext extends KerberosContext {
* way and generate bulletins when appropriate * way and generate bulletins when appropriate
*/ */
ComponentLog getLogger(); ComponentLog getLogger();
/**
* @return the {@link NodeTypeProvider} which can be used to detect the node
* type of this NiFi instance.
* @since Apache NiFi 1.5.0
*/
NodeTypeProvider getNodeTypeProvider();
} }

View File

@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.SchedulingStrategy;
@ -73,6 +74,11 @@ public class MockReportingInitializationContext extends MockControllerServiceLoo
return this; return this;
} }
@Override
public NodeTypeProvider getNodeTypeProvider() {
return null;
}
@Override @Override
public String getSchedulingPeriod() { public String getSchedulingPeriod() {
return "0 sec"; return "0 sec";

View File

@ -3150,7 +3150,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
if (firstTimeAdded) { if (firstTimeAdded) {
final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(), final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(),
SchedulingStrategy.TIMER_DRIVEN, "1 min", taskNode.getLogger(), this, nifiProperties); SchedulingStrategy.TIMER_DRIVEN, "1 min", taskNode.getLogger(), this, nifiProperties, this);
try { try {
taskNode.getReportingTask().initialize(config); taskNode.getReportingTask().initialize(config);

View File

@ -620,7 +620,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask()); final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask());
final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(), final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(),
SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller, nifiProperties); SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller, nifiProperties, controller);
try { try {
reportingTask.getReportingTask().initialize(config); reportingTask.getReportingTask().initialize(config);

View File

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingInitializationContext;
@ -38,11 +39,13 @@ public class StandardReportingInitializationContext implements ReportingInitiali
private final ControllerServiceProvider serviceProvider; private final ControllerServiceProvider serviceProvider;
private final ComponentLog logger; private final ComponentLog logger;
private final NiFiProperties nifiProperties; private final NiFiProperties nifiProperties;
private final NodeTypeProvider nodeTypeProvider;
public StandardReportingInitializationContext( public StandardReportingInitializationContext(
final String id, final String name, final SchedulingStrategy schedulingStrategy, final String id, final String name, final SchedulingStrategy schedulingStrategy,
final String schedulingPeriod, final ComponentLog logger, final String schedulingPeriod, final ComponentLog logger,
final ControllerServiceProvider serviceProvider, final NiFiProperties nifiProperties) { final ControllerServiceProvider serviceProvider, final NiFiProperties nifiProperties,
final NodeTypeProvider nodeTypeProvider) {
this.id = id; this.id = id;
this.name = name; this.name = name;
this.schedulingPeriod = schedulingPeriod; this.schedulingPeriod = schedulingPeriod;
@ -50,6 +53,7 @@ public class StandardReportingInitializationContext implements ReportingInitiali
this.schedulingStrategy = schedulingStrategy; this.schedulingStrategy = schedulingStrategy;
this.logger = logger; this.logger = logger;
this.nifiProperties = nifiProperties; this.nifiProperties = nifiProperties;
this.nodeTypeProvider = nodeTypeProvider;
} }
@Override @Override
@ -134,4 +138,9 @@ public class StandardReportingInitializationContext implements ReportingInitiali
public File getKerberosConfigurationFile() { public File getKerberosConfigurationFile() {
return nifiProperties.getKerberosConfigurationFile(); return nifiProperties.getKerberosConfigurationFile();
} }
@Override
public NodeTypeProvider getNodeTypeProvider() {
return nodeTypeProvider;
}
} }

View File

@ -116,7 +116,7 @@ public class TestStandardProcessScheduler {
reportingTask = new TestReportingTask(); reportingTask = new TestReportingTask();
final ReportingInitializationContext config = new StandardReportingInitializationContext(UUID.randomUUID().toString(), "Test", SchedulingStrategy.TIMER_DRIVEN, "5 secs", final ReportingInitializationContext config = new StandardReportingInitializationContext(UUID.randomUUID().toString(), "Test", SchedulingStrategy.TIMER_DRIVEN, "5 secs",
Mockito.mock(ComponentLog.class), null, nifiProperties); Mockito.mock(ComponentLog.class), null, nifiProperties, null);
reportingTask.initialize(config); reportingTask.initialize(config);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null, variableRegistry); final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null, variableRegistry);

View File

@ -17,6 +17,7 @@
package org.apache.nifi.mock; package org.apache.nifi.mock;
import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.SchedulingStrategy;
@ -80,4 +81,9 @@ public class MockReportingInitializationContext implements ReportingInitializati
public File getKerberosConfigurationFile() { public File getKerberosConfigurationFile() {
return null; return null;
} }
@Override
public NodeTypeProvider getNodeTypeProvider() {
return null;
}
} }