mirror of https://github.com/apache/nifi.git
NIFI-4: Added better support for reporting task and controller service lifecycle via annotations
This commit is contained in:
parent
53328a4a02
commit
b5956709b7
|
@ -191,7 +191,6 @@ import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
|
||||||
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
|
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
|
||||||
import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
|
import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
|
||||||
import org.apache.nifi.web.util.WebUtils;
|
import org.apache.nifi.web.util.WebUtils;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -1290,18 +1289,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Adds an instance of a specified controller service.
|
|
||||||
*
|
|
||||||
* @param type
|
|
||||||
* @param id
|
|
||||||
* @param properties
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public ControllerServiceNode createControllerService(String type, String id, Map<String, String> properties) {
|
|
||||||
return controllerServiceProvider.createControllerService(type, id, properties);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ControllerService getControllerService(String serviceIdentifier) {
|
public ControllerService getControllerService(String serviceIdentifier) {
|
||||||
|
@ -1323,6 +1310,16 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
||||||
return controllerServiceProvider.isControllerServiceEnabled(serviceIdentifier);
|
return controllerServiceProvider.isControllerServiceEnabled(serviceIdentifier);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
|
||||||
|
return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeControllerService(final ControllerServiceNode serviceNode) {
|
||||||
|
controllerServiceProvider.removeControllerService(serviceNode);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle a bulletins message.
|
* Handle a bulletins message.
|
||||||
*
|
*
|
||||||
|
|
|
@ -73,4 +73,5 @@ public interface ReportingTaskNode extends ConfiguredComponent {
|
||||||
void verifyCanDisable();
|
void verifyCanDisable();
|
||||||
void verifyCanEnable();
|
void verifyCanEnable();
|
||||||
void verifyCanDelete();
|
void verifyCanDelete();
|
||||||
|
void verifyCanUpdate();
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,4 +43,5 @@ public interface ControllerServiceNode extends ConfiguredComponent {
|
||||||
void verifyCanEnable();
|
void verifyCanEnable();
|
||||||
void verifyCanDisable();
|
void verifyCanDisable();
|
||||||
void verifyCanDelete();
|
void verifyCanDelete();
|
||||||
|
void verifyCanUpdate();
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.nifi.controller.reporting;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.nifi.annotation.lifecycle.OnAdded;
|
|
||||||
import org.apache.nifi.controller.AbstractConfiguredComponent;
|
import org.apache.nifi.controller.AbstractConfiguredComponent;
|
||||||
import org.apache.nifi.controller.Availability;
|
import org.apache.nifi.controller.Availability;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
|
@ -27,7 +26,6 @@ import org.apache.nifi.controller.ControllerServiceLookup;
|
||||||
import org.apache.nifi.controller.ProcessScheduler;
|
import org.apache.nifi.controller.ProcessScheduler;
|
||||||
import org.apache.nifi.controller.ReportingTaskNode;
|
import org.apache.nifi.controller.ReportingTaskNode;
|
||||||
import org.apache.nifi.controller.ScheduledState;
|
import org.apache.nifi.controller.ScheduledState;
|
||||||
import org.apache.nifi.controller.StandardProcessorNode;
|
|
||||||
import org.apache.nifi.controller.ValidationContextFactory;
|
import org.apache.nifi.controller.ValidationContextFactory;
|
||||||
import org.apache.nifi.controller.annotation.OnConfigured;
|
import org.apache.nifi.controller.annotation.OnConfigured;
|
||||||
import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
|
import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
|
||||||
|
@ -145,6 +143,8 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onConfigured() {
|
private void onConfigured() {
|
||||||
|
// We need to invoke any method annotation with the OnConfigured annotation in order to
|
||||||
|
// maintain backward compatibility. This will be removed when we remove the old, deprecated annotations.
|
||||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||||
final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceLookup);
|
final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceLookup);
|
||||||
ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, configContext);
|
ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, configContext);
|
||||||
|
@ -153,11 +153,58 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isDisabled() {
|
||||||
|
return scheduledState == ScheduledState.DISABLED;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanDelete() {
|
public void verifyCanDelete() {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
throw new IllegalStateException(this + " is running");
|
throw new IllegalStateException("Cannot delete " + reportingTask + " because it is currently running");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void verifyCanDisable() {
|
||||||
|
if ( isRunning() ) {
|
||||||
|
throw new IllegalStateException("Cannot disable " + reportingTask + " because it is currently running");
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( isDisabled() ) {
|
||||||
|
throw new IllegalStateException("Cannot disable " + reportingTask + " because it is already disabled");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void verifyCanEnable() {
|
||||||
|
if ( !isDisabled() ) {
|
||||||
|
throw new IllegalStateException("Cannot enable " + reportingTask + " because it is not disabled");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void verifyCanStart() {
|
||||||
|
if ( isDisabled() ) {
|
||||||
|
throw new IllegalStateException("Cannot start " + reportingTask + " because it is currently disabled");
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( isRunning() ) {
|
||||||
|
throw new IllegalStateException("Cannot start " + reportingTask + " because it is already running");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void verifyCanStop() {
|
||||||
|
if ( !isRunning() ) {
|
||||||
|
throw new IllegalStateException("Cannot stop " + reportingTask + " because it is not running");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void verifyCanUpdate() {
|
||||||
|
if ( isRunning() ) {
|
||||||
|
throw new IllegalStateException("Cannot update " + reportingTask + " because it is currently running");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -123,23 +123,21 @@ public class ControllerServiceLoader {
|
||||||
|
|
||||||
final List<Element> serviceNodes = DomUtils.getChildElementsByTagName(servicesElement, "service");
|
final List<Element> serviceNodes = DomUtils.getChildElementsByTagName(servicesElement, "service");
|
||||||
for (final Element serviceElement : serviceNodes) {
|
for (final Element serviceElement : serviceNodes) {
|
||||||
//add global properties common to all tasks
|
|
||||||
Map<String, String> properties = new HashMap<>();
|
|
||||||
|
|
||||||
//get properties for the specific controller task - id, name, class,
|
//get properties for the specific controller task - id, name, class,
|
||||||
//and schedulingPeriod must be set
|
//and schedulingPeriod must be set
|
||||||
final String serviceId = DomUtils.getChild(serviceElement, "identifier").getTextContent().trim();
|
final String serviceId = DomUtils.getChild(serviceElement, "identifier").getTextContent().trim();
|
||||||
final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim();
|
final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim();
|
||||||
|
|
||||||
|
//set the class to be used for the configured controller task
|
||||||
|
final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, false);
|
||||||
|
|
||||||
//optional task-specific properties
|
//optional task-specific properties
|
||||||
for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) {
|
for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) {
|
||||||
final String name = optionalProperty.getAttribute("name").trim();
|
final String name = optionalProperty.getAttribute("name").trim();
|
||||||
final String value = optionalProperty.getTextContent().trim();
|
final String value = optionalProperty.getTextContent().trim();
|
||||||
properties.put(name, value);
|
serviceNode.setProperty(name, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
//set the class to be used for the configured controller task
|
|
||||||
final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, properties);
|
|
||||||
services.add(serviceNode);
|
services.add(serviceNode);
|
||||||
serviceNode.setDisabled(false);
|
serviceNode.setDisabled(false);
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,7 +166,30 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
|
||||||
@Override
|
@Override
|
||||||
public void verifyCanDelete() {
|
public void verifyCanDelete() {
|
||||||
if ( !isDisabled() ) {
|
if ( !isDisabled() ) {
|
||||||
throw new IllegalStateException(this + " cannot be deleted because it has not been disabled");
|
throw new IllegalStateException(implementation + " cannot be deleted because it is not disabled");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void verifyCanDisable() {
|
||||||
|
final ControllerServiceReference references = getReferences();
|
||||||
|
final int numRunning = references.getRunningReferences().size();
|
||||||
|
if ( numRunning > 0 ) {
|
||||||
|
throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by " + numRunning + " components that are currently running");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void verifyCanEnable() {
|
||||||
|
if ( !isDisabled() ) {
|
||||||
|
throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void verifyCanUpdate() {
|
||||||
|
if ( !isDisabled() ) {
|
||||||
|
throw new IllegalStateException(implementation + " cannot be updated because it is not disabled");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -117,6 +117,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
|
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
|
||||||
final ControllerServiceNode node = serviceNodeHolder.get();
|
final ControllerServiceNode node = serviceNodeHolder.get();
|
||||||
if (node.isDisabled() && !validDisabledMethods.contains(method)) {
|
if (node.isDisabled() && !validDisabledMethods.contains(method)) {
|
||||||
|
// Use nar class loader here because we are implicitly calling toString() on the original implementation.
|
||||||
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
|
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
|
||||||
throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service " + originalService + " because the Controller Service is disabled");
|
throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service " + originalService + " because the Controller Service is disabled");
|
||||||
} catch (final Throwable e) {
|
} catch (final Throwable e) {
|
||||||
|
|
|
@ -384,8 +384,12 @@ public class ControllerFacade implements ControllerServiceProvider {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ControllerServiceNode createControllerService(String type, String id, Map<String, String> properties) {
|
public ControllerServiceNode createControllerService(String type, String id, boolean firstTimeAdded) {
|
||||||
return flowController.createControllerService(type, id, properties);
|
return flowController.createControllerService(type, id, firstTimeAdded);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeControllerService(ControllerServiceNode serviceNode) {
|
||||||
|
flowController.removeControllerService(serviceNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue