diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 81a858319e..c7dd22d6f4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -30,6 +30,8 @@ import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; @@ -90,6 +92,7 @@ import org.slf4j.LoggerFactory; import java.lang.management.ThreadInfo; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.URL; import java.util.ArrayList; import java.util.Collection; @@ -1598,6 +1601,24 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } } + @Override + public void notifyPrimaryNodeChanged(final PrimaryNodeState nodeState, final LifecycleState lifecycleState) { + final Class implClass = getProcessor().getClass(); + final List methods = ReflectionUtils.findMethodsWithAnnotations(implClass, new Class[] {OnPrimaryNodeStateChange.class}); + if (methods.isEmpty()) { + return; + } + + lifecycleState.incrementActiveThreadCount(null); + activateThread(); + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), implClass, getIdentifier())) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, getProcessor(), nodeState); + } finally { + deactivateThread(); + lifecycleState.decrementActiveThreadCount(); + } + } + private void activateThread() { final Thread thread = Thread.currentThread(); final Long timestamp = System.currentTimeMillis(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index 22b8f1b418..0733627f12 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@ -17,6 +17,8 @@ package org.apache.nifi.controller.reporting; import org.apache.nifi.annotation.configuration.DefaultSchedule; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.bundle.Bundle; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigVerificationResult; @@ -36,6 +38,7 @@ import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.TerminationAwareLogger; import org.apache.nifi.controller.ValidationContextFactory; +import org.apache.nifi.controller.scheduling.LifecycleState; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.StandardConfigurationContext; @@ -50,10 +53,12 @@ import org.apache.nifi.reporting.VerifiableReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.CharacterFilterUtils; import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.util.file.classloader.ClassLoaderUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Method; import java.net.URL; import java.util.ArrayList; import java.util.Collection; @@ -395,4 +400,23 @@ public abstract class AbstractReportingTaskNode extends AbstractComponentNode im return results; } + + @Override + public void notifyPrimaryNodeChanged(final PrimaryNodeState nodeState, final LifecycleState lifecycleState) { + final Class taskClass = getReportingTask().getClass(); + final List methods = ReflectionUtils.findMethodsWithAnnotations(taskClass, new Class[] {OnPrimaryNodeStateChange.class}); + if (methods.isEmpty()) { + return; + } + + lifecycleState.incrementActiveThreadCount(null); + try { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), taskClass, getIdentifier())) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, getReportingTask(), nodeState); + } + } finally { + lifecycleState.decrementActiveThreadCount(); + } + } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index 811fa9933c..95391be3b0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -21,6 +21,8 @@ import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.documentation.DeprecationNotice; import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; @@ -66,6 +68,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.URL; import java.util.ArrayList; import java.util.Collections; @@ -772,4 +775,17 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme this.bulletinLevel = level; } + @Override + public void notifyPrimaryNodeChanged(final PrimaryNodeState nodeState) { + final Class implementationClass = getControllerServiceImplementation().getClass(); + final List methods = ReflectionUtils.findMethodsWithAnnotations(implementationClass, new Class[] {OnPrimaryNodeStateChange.class}); + if (methods.isEmpty()) { + return; + } + + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), implementationClass, getIdentifier())) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, getControllerServiceImplementation(), nodeState); + } + } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/ReflectionUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/ReflectionUtils.java index 5c5ef9ae2d..d04dfcce3b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/ReflectionUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/ReflectionUtils.java @@ -159,7 +159,7 @@ public class ReflectionUtils { return isSuccess; } - private static List findMethodsWithAnnotations(final Class clazz, final Class[] annotationClasses) { + public static List findMethodsWithAnnotations(final Class clazz, final Class[] annotationClasses) { // We use a cache here to store a mapping of Class & Annotation[] to those methods that contain the annotation. // This is done because discovering this using Reflection is fairly expensive (can take up to tens of milliseconds on laptop). // While this may not seem like much time, consider deleting a Process Group with thousands of Processors or instantiating diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java index 162d421db8..5935bf7cc9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java @@ -1393,4 +1393,5 @@ public abstract class AbstractComponentNode implements ComponentNode { public boolean isReferencingParameter(final String parameterName) { return parameterReferenceCounts.containsKey(parameterName); } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java index ff94934442..044ab24a7d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.controller; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; @@ -260,4 +261,11 @@ public interface ProcessScheduler { * @param task the task to perform */ Future submitFrameworkTask(Runnable task); + + void notifyPrimaryNodeStateChange(ProcessorNode processor, PrimaryNodeState primaryNodeState); + + void notifyPrimaryNodeStateChange(ControllerServiceNode service, PrimaryNodeState primaryNodeState); + + void notifyPrimaryNodeStateChange(ReportingTaskNode taskNode, PrimaryNodeState primaryNodeState); + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index e3e31b91ca..e1f5188852 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -17,6 +17,8 @@ package org.apache.nifi.controller; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.validation.ValidationStatus; import org.apache.nifi.components.validation.ValidationTrigger; import org.apache.nifi.connectable.Connectable; @@ -27,7 +29,6 @@ import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.nar.ExtensionManager; -import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; @@ -300,4 +301,6 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con * @param context The ProcessContext associated with the Processor configuration */ public abstract void onConfigurationRestored(ProcessContext context); + + public abstract void notifyPrimaryNodeChanged(PrimaryNodeState primaryNodeState, LifecycleState lifecycleState); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java index 83b0b411a2..fd19476306 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java @@ -16,10 +16,12 @@ */ package org.apache.nifi.controller; +import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.controller.scheduling.LifecycleState; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.nar.ExtensionManager; -import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -132,4 +134,7 @@ public interface ReportingTaskNode extends ComponentNode { void enable(); void disable(); + + void notifyPrimaryNodeChanged(PrimaryNodeState primaryNodeState, LifecycleState lifecycleState); + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index f02322bd6f..7a53506fb4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.controller.service; +import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.VersionedComponent; import org.apache.nifi.controller.ComponentNode; @@ -26,7 +28,6 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.nar.ExtensionManager; -import org.apache.nifi.components.ConfigVerificationResult; import java.util.List; import java.util.Map; @@ -241,4 +242,6 @@ public interface ControllerServiceNode extends ComponentNode, VersionedComponent final LoggableComponent proxiedControllerService, final ControllerServiceInvocationHandler invocationHandler); + void notifyPrimaryNodeChanged(PrimaryNodeState primaryNodeState); + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 601c73f98d..1d52e12159 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -19,7 +19,6 @@ package org.apache.nifi.controller; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; -import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.authorization.Resource; @@ -2522,21 +2521,15 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node public void setPrimary(final boolean primary) { final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED; final ProcessGroup rootGroup = flowManager.getRootGroup(); + for (final ProcessorNode procNode : rootGroup.findAllProcessors()) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getIdentifier())) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState); - } + processScheduler.submitFrameworkTask(() -> processScheduler.notifyPrimaryNodeStateChange(procNode, nodeState) ); } for (final ControllerServiceNode serviceNode : flowManager.getAllControllerServices()) { - final Class serviceImplClass = serviceNode.getControllerServiceImplementation().getClass(); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, serviceImplClass, serviceNode.getIdentifier())) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState); - } + processScheduler.submitFrameworkTask(() -> processScheduler.notifyPrimaryNodeStateChange(serviceNode, nodeState) ); } for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, reportingTaskNode.getReportingTask().getClass(), reportingTaskNode.getIdentifier())) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState); - } + processScheduler.submitFrameworkTask(() -> processScheduler.notifyPrimaryNodeStateChange(reportingTaskNode, nodeState) ); } // update primary diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 87a5ac558f..c9633bf89d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -20,6 +20,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.components.validation.ValidationStatus; @@ -451,6 +452,23 @@ public final class StandardProcessScheduler implements ProcessScheduler { LOG.info("Successfully terminated {} with {} active threads", procNode, tasksTerminated); } + @Override + public void notifyPrimaryNodeStateChange(final ProcessorNode processor, final PrimaryNodeState primaryNodeState) { + final LifecycleState lifecycleState = getLifecycleState(processor, false); + processor.notifyPrimaryNodeChanged(primaryNodeState, lifecycleState); + } + + @Override + public void notifyPrimaryNodeStateChange(final ReportingTaskNode taskNode, final PrimaryNodeState primaryNodeState) { + final LifecycleState lifecycleState = getLifecycleState(taskNode, false); + taskNode.notifyPrimaryNodeChanged(primaryNodeState, lifecycleState); + } + + @Override + public void notifyPrimaryNodeStateChange(final ControllerServiceNode service, final PrimaryNodeState primaryNodeState) { + service.notifyPrimaryNodeChanged(primaryNodeState); + } + @Override public void onProcessorRemoved(final ProcessorNode procNode) { lifecycleStates.remove(procNode); diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java index b47b43a483..faf6e4ee9a 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java @@ -19,6 +19,7 @@ package org.apache.nifi.controller.scheduling; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.validation.ValidationStatus; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Funnel; @@ -316,4 +317,16 @@ public class StatelessProcessScheduler implements ProcessScheduler { public Future submitFrameworkTask(final Runnable task) { return null; } + + @Override + public void notifyPrimaryNodeStateChange(final ProcessorNode processor, final PrimaryNodeState primaryNodeState) { + } + + @Override + public void notifyPrimaryNodeStateChange(final ControllerServiceNode service, final PrimaryNodeState primaryNodeState) { + } + + @Override + public void notifyPrimaryNodeStateChange(final ReportingTaskNode taskNode, final PrimaryNodeState primaryNodeState) { + } } diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/CountPrimaryNodeChangeEvents.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/CountPrimaryNodeChangeEvents.java new file mode 100644 index 0000000000..4040ca1aa9 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/CountPrimaryNodeChangeEvents.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.tests.system; + +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.configuration.DefaultSchedule; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +@TriggerWhenEmpty +@DefaultSchedule(period="100 millis") +public class CountPrimaryNodeChangeEvents extends AbstractSessionFactoryProcessor { + private static final String nodeNumber = System.getProperty("nodeNumber"); + + static final PropertyDescriptor EVENT_SLEEP_DURATION = new PropertyDescriptor.Builder() + .name("Event Sleep Duration") + .displayName("Event Sleep Duration") + .description("The amount of time to sleep when the onPrimaryNodeChange event occurs") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("0 sec") + .build(); + + @Override + protected List getSupportedPropertyDescriptors() { + return Collections.singletonList(EVENT_SLEEP_DURATION); + } + + private final AtomicReference sessionReference = new AtomicReference<>(); + private volatile long sleepMillis = 0L; + + @OnPrimaryNodeStateChange + public void onPrimaryNodeChange() { + final ProcessSession session = sessionReference.get(); + if (session == null) { + return; + } + + session.adjustCounter("PrimaryNodeChangeCalled-" + nodeNumber, 1L, true); + + try { + Thread.sleep(sleepMillis); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + + session.adjustCounter("PrimaryNodeChangeCompleted-" + nodeNumber, 1L, true); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + final ProcessSession current = sessionReference.get(); + if (current == null) { + final ProcessSession session = sessionFactory.createSession(); + sessionReference.compareAndSet(null, session); + } + + sleepMillis = context.getProperty(EVENT_SLEEP_DURATION).asTimePeriod(TimeUnit.MILLISECONDS); + sessionReference.get().adjustCounter("Triggers-" + nodeNumber, 1L, true); + } +} diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 83e1f6a3fd..a7919d8331 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -31,6 +31,7 @@ org.apache.nifi.processors.tests.system.LoopFlowFile org.apache.nifi.processors.tests.system.PartitionText org.apache.nifi.processors.tests.system.PassThrough org.apache.nifi.processors.tests.system.PassThroughRequiresInstanceClassLoading +org.apache.nifi.processors.tests.system.CountPrimaryNodeChangeEvents org.apache.nifi.processors.tests.system.ReplaceWithFile org.apache.nifi.processors.tests.system.ReverseContents org.apache.nifi.processors.tests.system.RoundRobinFlowFiles diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java index bacb480a5a..5ccd54ae5c 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java @@ -213,23 +213,24 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { final long maxTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60); while (true) { + int connectedNodeCount = -1; try { final ClusteSummaryEntity clusterSummary = client.getFlowClient().getClusterSummary(); - final int connectedNodeCount = clusterSummary.getClusterSummary().getConnectedNodeCount(); + connectedNodeCount = clusterSummary.getClusterSummary().getConnectedNodeCount(); if (connectedNodeCount == expectedNumberOfNodes) { logger.info("Wait successful, {} nodes connected", expectedNumberOfNodes); return; } logEverySecond("Waiting for {} nodes to connect but currently only {} nodes are connected", expectedNumberOfNodes, connectedNodeCount); - - if (System.currentTimeMillis() > maxTime) { - throw new RuntimeException("Waited up to 60 seconds for both nodes to connect but only " + connectedNodeCount + " nodes connected"); - } } catch (final Exception e) { e.printStackTrace(); } + if (System.currentTimeMillis() > maxTime) { + throw new RuntimeException("Waited up to 60 seconds for both nodes to connect but only " + connectedNodeCount + " nodes connected"); + } + try { Thread.sleep(sleepMillis); } catch (final InterruptedException ie) { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/PrimaryNodeChangeNotificationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/PrimaryNodeChangeNotificationIT.java new file mode 100644 index 0000000000..d8e75251f8 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/PrimaryNodeChangeNotificationIT.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.clustering; + +import org.apache.nifi.cluster.coordination.node.ClusterRoles; +import org.apache.nifi.tests.system.NiFiInstanceFactory; +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient; +import org.apache.nifi.web.api.dto.NodeDTO; +import org.apache.nifi.web.api.entity.ClusterEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class PrimaryNodeChangeNotificationIT extends NiFiSystemIT { + @Override + public NiFiInstanceFactory getInstanceFactory() { + return createTwoNodeInstanceFactory(); + } + + @Test + public void testNotifications() throws NiFiClientException, IOException, InterruptedException { + final ProcessorEntity processor = getClientUtil().createProcessor("CountPrimaryNodeChangeEvents"); + getClientUtil().startProcessor(processor); + + // Wait for processor to be triggered on both nodes + Map counters = new HashMap<>(); + while (!counters.containsKey("Triggers-1") || !counters.containsKey("Triggers-2")) { + Thread.sleep(10L); + counters = getClientUtil().getCountersAsMap(processor.getId()); + } + + final NodeDTO primaryNode = getNode(ClusterRoles.PRIMARY_NODE, false); + final NodeDTO nonPrimaryNode = getNode(ClusterRoles.PRIMARY_NODE, true); + + getClientUtil().disconnectNode(primaryNode.getNodeId()); + setupClient(nonPrimaryNode.getApiPort()); + waitForNodeStatus(primaryNode, "DISCONNECTED"); + + getClientUtil().connectNode(primaryNode.getNodeId()); + waitForAllNodesConnected(); + + waitFor(() -> { + final Map counterMap = getClientUtil().getCountersAsMap(processor.getId()); + final Long notificationCalledNode1 = counterMap.get("PrimaryNodeChangeCalled-1"); + final Long notificationCalledNode2 = counterMap.get("PrimaryNodeChangeCalled-2"); + final Long notificationCompletedNode1 = counterMap.get("PrimaryNodeChangeCompleted-1"); + final Long notificationCompletedNode2 = counterMap.get("PrimaryNodeChangeCompleted-2"); + + return notificationCalledNode1 > 0 && notificationCalledNode2 > 0 && notificationCompletedNode1 > 0 && notificationCompletedNode2 > 0; + }); + } + + @Test + public void testTerminateNotificationWhenBlocked() throws NiFiClientException, IOException, InterruptedException { + final ProcessorEntity processor = getClientUtil().createProcessor("CountPrimaryNodeChangeEvents"); + + // Set the event sleep duration to 10 minutes so that the processor will be blocked for a while when the primary node changes + getClientUtil().updateProcessorProperties(processor, Collections.singletonMap("Event Sleep Duration", "10 mins")); + getClientUtil().startProcessor(processor); + + // Wait for processor to be triggered on both nodes + Map counters = new HashMap<>(); + while (!counters.containsKey("Triggers-1") || !counters.containsKey("Triggers-2")) { + Thread.sleep(10L); + counters = getClientUtil().getCountersAsMap(processor.getId()); + } + + final NodeDTO primaryNode = getNode(ClusterRoles.PRIMARY_NODE, false); + final NodeDTO nonPrimaryNode = getNode(ClusterRoles.PRIMARY_NODE, true); + + getClientUtil().disconnectNode(primaryNode.getNodeId()); + setupClient(nonPrimaryNode.getApiPort()); + waitForNodeStatus(primaryNode, "DISCONNECTED"); + + getClientUtil().connectNode(primaryNode.getNodeId()); + waitForAllNodesConnected(); + + // Wait until the "called" counter is incremented but hte ChangeCompleted counter is not + waitFor(() -> { + final Map counterMap = getClientUtil().getCountersAsMap(processor.getId()); + final Long notificationCalledNode1 = counterMap.get("PrimaryNodeChangeCalled-1"); + final Long notificationCalledNode2 = counterMap.get("PrimaryNodeChangeCalled-2"); + final Long notificationCompletedNode1 = counterMap.get("PrimaryNodeChangeCompleted-1"); + final Long notificationCompletedNode2 = counterMap.get("PrimaryNodeChangeCompleted-2"); + + return notificationCalledNode1 > 0 && notificationCalledNode2 > 0 && notificationCompletedNode1 == null && notificationCompletedNode2 == null; + }); + + // wait 1 second and check again to make sure the ChangeCompleted counter is still not incremented + Thread.sleep(1000L); + + waitFor(() -> { + final Map counterMap = getClientUtil().getCountersAsMap(processor.getId()); + final Long notificationCompletedNode1 = counterMap.get("PrimaryNodeChangeCompleted-1"); + final Long notificationCompletedNode2 = counterMap.get("PrimaryNodeChangeCompleted-2"); + + return notificationCompletedNode1 == null && notificationCompletedNode2 == null; + }); + + final ProcessorClient processorClient = getNifiClient().getProcessorClient(); + assertTrue(processorClient.getProcessor(processor.getId()).getStatus().getAggregateSnapshot().getActiveThreadCount() > 0); + processorClient.stopProcessor(processor); + + // Wait a bit and make sure we still see a thread + Thread.sleep(1000L); + assertTrue(processorClient.getProcessor(processor.getId()).getStatus().getAggregateSnapshot().getActiveThreadCount() > 0); + + // Terminate the processor + processorClient.terminateProcessor(processor.getId()); + + // Wait for no threads to be active + waitFor(() -> processorClient.getProcessor(processor.getId()).getStatus().getAggregateSnapshot().getActiveThreadCount() == 0); + } + + + private NodeDTO getNode(final String roleName, final boolean invert) throws InterruptedException, NiFiClientException, IOException { + while (true) { + final ClusterEntity clusterEntity = getNifiClient().getControllerClient().getNodes(); + final Optional optionalPrimaryNodeDto = clusterEntity.getCluster().getNodes().stream() + .filter(node -> invert != node.getRoles().contains(roleName)) + .findFirst(); + + if (optionalPrimaryNodeDto.isPresent()) { + return optionalPrimaryNodeDto.get(); + } + + Thread.sleep(100L); + } + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java index bcd22ed046..917829e97d 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java @@ -288,5 +288,4 @@ public class JerseyProcessorClient extends AbstractJerseyClient implements Proce return getRequestBuilder(target).delete(ProcessorEntity.class); }); } - }