From 8eeebdb61402ebcceba326467aace9631517020d Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 15 Mar 2023 13:07:59 -0400 Subject: [PATCH] NIFI-11290 Run Component Primary Node State changes in background thread - Ensure that components are notified that primary node has changed in a background thread instead of the Leader Election thread and activate/deactivate the thread in the case of Processors so that they can be viewed in the UI and terminated - Fixed system tests that would fail intermittently because they did not wait for node disconnection to complete and did not properly switch the client to look at the connected node before checking cluster status This closes #7052 Signed-off-by: David Handermann --- .../controller/StandardProcessorNode.java | 21 +++ .../reporting/AbstractReportingTaskNode.java | 24 +++ .../StandardControllerServiceNode.java | 16 ++ .../org/apache/nifi/util/ReflectionUtils.java | 2 +- .../controller/AbstractComponentNode.java | 1 + .../nifi/controller/ProcessScheduler.java | 8 + .../apache/nifi/controller/ProcessorNode.java | 5 +- .../nifi/controller/ReportingTaskNode.java | 7 +- .../service/ControllerServiceNode.java | 5 +- .../nifi/controller/FlowController.java | 15 +- .../scheduling/StandardProcessScheduler.java | 18 ++ .../scheduling/StatelessProcessScheduler.java | 13 ++ .../system/CountPrimaryNodeChangeEvents.java | 87 ++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../nifi/tests/system/NiFiSystemIT.java | 11 +- .../PrimaryNodeChangeNotificationIT.java | 154 ++++++++++++++++++ .../nifi/impl/JerseyProcessorClient.java | 1 - 17 files changed, 368 insertions(+), 21 deletions(-) create mode 100644 nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/CountPrimaryNodeChangeEvents.java create mode 100644 nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/PrimaryNodeChangeNotificationIT.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 81a858319e..c7dd22d6f4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -30,6 +30,8 @@ import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; @@ -90,6 +92,7 @@ import org.slf4j.LoggerFactory; import java.lang.management.ThreadInfo; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.URL; import java.util.ArrayList; import java.util.Collection; @@ -1598,6 +1601,24 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } } + @Override + public void notifyPrimaryNodeChanged(final PrimaryNodeState nodeState, final LifecycleState lifecycleState) { + final Class implClass = getProcessor().getClass(); + final List methods = ReflectionUtils.findMethodsWithAnnotations(implClass, new Class[] {OnPrimaryNodeStateChange.class}); + if (methods.isEmpty()) { + return; + } + + lifecycleState.incrementActiveThreadCount(null); + activateThread(); + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), implClass, getIdentifier())) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, getProcessor(), nodeState); + } finally { + deactivateThread(); + lifecycleState.decrementActiveThreadCount(); + } + } + private void activateThread() { final Thread thread = Thread.currentThread(); final Long timestamp = System.currentTimeMillis(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index 22b8f1b418..0733627f12 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@ -17,6 +17,8 @@ package org.apache.nifi.controller.reporting; import org.apache.nifi.annotation.configuration.DefaultSchedule; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.bundle.Bundle; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigVerificationResult; @@ -36,6 +38,7 @@ import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.TerminationAwareLogger; import org.apache.nifi.controller.ValidationContextFactory; +import org.apache.nifi.controller.scheduling.LifecycleState; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.StandardConfigurationContext; @@ -50,10 +53,12 @@ import org.apache.nifi.reporting.VerifiableReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.CharacterFilterUtils; import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.util.file.classloader.ClassLoaderUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Method; import java.net.URL; import java.util.ArrayList; import java.util.Collection; @@ -395,4 +400,23 @@ public abstract class AbstractReportingTaskNode extends AbstractComponentNode im return results; } + + @Override + public void notifyPrimaryNodeChanged(final PrimaryNodeState nodeState, final LifecycleState lifecycleState) { + final Class taskClass = getReportingTask().getClass(); + final List methods = ReflectionUtils.findMethodsWithAnnotations(taskClass, new Class[] {OnPrimaryNodeStateChange.class}); + if (methods.isEmpty()) { + return; + } + + lifecycleState.incrementActiveThreadCount(null); + try { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), taskClass, getIdentifier())) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, getReportingTask(), nodeState); + } + } finally { + lifecycleState.decrementActiveThreadCount(); + } + } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index 811fa9933c..95391be3b0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -21,6 +21,8 @@ import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.documentation.DeprecationNotice; import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; @@ -66,6 +68,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.URL; import java.util.ArrayList; import java.util.Collections; @@ -772,4 +775,17 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme this.bulletinLevel = level; } + @Override + public void notifyPrimaryNodeChanged(final PrimaryNodeState nodeState) { + final Class implementationClass = getControllerServiceImplementation().getClass(); + final List methods = ReflectionUtils.findMethodsWithAnnotations(implementationClass, new Class[] {OnPrimaryNodeStateChange.class}); + if (methods.isEmpty()) { + return; + } + + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), implementationClass, getIdentifier())) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, getControllerServiceImplementation(), nodeState); + } + } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/ReflectionUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/ReflectionUtils.java index 5c5ef9ae2d..d04dfcce3b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/ReflectionUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/ReflectionUtils.java @@ -159,7 +159,7 @@ public class ReflectionUtils { return isSuccess; } - private static List findMethodsWithAnnotations(final Class clazz, final Class[] annotationClasses) { + public static List findMethodsWithAnnotations(final Class clazz, final Class[] annotationClasses) { // We use a cache here to store a mapping of Class & Annotation[] to those methods that contain the annotation. // This is done because discovering this using Reflection is fairly expensive (can take up to tens of milliseconds on laptop). // While this may not seem like much time, consider deleting a Process Group with thousands of Processors or instantiating diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java index 162d421db8..5935bf7cc9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java @@ -1393,4 +1393,5 @@ public abstract class AbstractComponentNode implements ComponentNode { public boolean isReferencingParameter(final String parameterName) { return parameterReferenceCounts.containsKey(parameterName); } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java index ff94934442..044ab24a7d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.controller; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; @@ -260,4 +261,11 @@ public interface ProcessScheduler { * @param task the task to perform */ Future submitFrameworkTask(Runnable task); + + void notifyPrimaryNodeStateChange(ProcessorNode processor, PrimaryNodeState primaryNodeState); + + void notifyPrimaryNodeStateChange(ControllerServiceNode service, PrimaryNodeState primaryNodeState); + + void notifyPrimaryNodeStateChange(ReportingTaskNode taskNode, PrimaryNodeState primaryNodeState); + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index e3e31b91ca..e1f5188852 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -17,6 +17,8 @@ package org.apache.nifi.controller; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.validation.ValidationStatus; import org.apache.nifi.components.validation.ValidationTrigger; import org.apache.nifi.connectable.Connectable; @@ -27,7 +29,6 @@ import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.nar.ExtensionManager; -import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; @@ -300,4 +301,6 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con * @param context The ProcessContext associated with the Processor configuration */ public abstract void onConfigurationRestored(ProcessContext context); + + public abstract void notifyPrimaryNodeChanged(PrimaryNodeState primaryNodeState, LifecycleState lifecycleState); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java index 83b0b411a2..fd19476306 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java @@ -16,10 +16,12 @@ */ package org.apache.nifi.controller; +import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.controller.scheduling.LifecycleState; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.nar.ExtensionManager; -import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -132,4 +134,7 @@ public interface ReportingTaskNode extends ComponentNode { void enable(); void disable(); + + void notifyPrimaryNodeChanged(PrimaryNodeState primaryNodeState, LifecycleState lifecycleState); + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index f02322bd6f..7a53506fb4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.controller.service; +import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.VersionedComponent; import org.apache.nifi.controller.ComponentNode; @@ -26,7 +28,6 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.nar.ExtensionManager; -import org.apache.nifi.components.ConfigVerificationResult; import java.util.List; import java.util.Map; @@ -241,4 +242,6 @@ public interface ControllerServiceNode extends ComponentNode, VersionedComponent final LoggableComponent proxiedControllerService, final ControllerServiceInvocationHandler invocationHandler); + void notifyPrimaryNodeChanged(PrimaryNodeState primaryNodeState); + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 376bdcc1f9..1f470c9ab2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -19,7 +19,6 @@ package org.apache.nifi.controller; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; -import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.authorization.Resource; @@ -2520,21 +2519,15 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node public void setPrimary(final boolean primary) { final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED; final ProcessGroup rootGroup = flowManager.getRootGroup(); + for (final ProcessorNode procNode : rootGroup.findAllProcessors()) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getIdentifier())) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState); - } + processScheduler.submitFrameworkTask(() -> processScheduler.notifyPrimaryNodeStateChange(procNode, nodeState) ); } for (final ControllerServiceNode serviceNode : flowManager.getAllControllerServices()) { - final Class serviceImplClass = serviceNode.getControllerServiceImplementation().getClass(); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, serviceImplClass, serviceNode.getIdentifier())) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState); - } + processScheduler.submitFrameworkTask(() -> processScheduler.notifyPrimaryNodeStateChange(serviceNode, nodeState) ); } for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, reportingTaskNode.getReportingTask().getClass(), reportingTaskNode.getIdentifier())) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState); - } + processScheduler.submitFrameworkTask(() -> processScheduler.notifyPrimaryNodeStateChange(reportingTaskNode, nodeState) ); } // update primary diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 2c340de2cc..e2aa0345b7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -20,6 +20,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.components.validation.ValidationStatus; @@ -451,6 +452,23 @@ public final class StandardProcessScheduler implements ProcessScheduler { LOG.info("Successfully terminated {} with {} active threads", procNode, tasksTerminated); } + @Override + public void notifyPrimaryNodeStateChange(final ProcessorNode processor, final PrimaryNodeState primaryNodeState) { + final LifecycleState lifecycleState = getLifecycleState(processor, false); + processor.notifyPrimaryNodeChanged(primaryNodeState, lifecycleState); + } + + @Override + public void notifyPrimaryNodeStateChange(final ReportingTaskNode taskNode, final PrimaryNodeState primaryNodeState) { + final LifecycleState lifecycleState = getLifecycleState(taskNode, false); + taskNode.notifyPrimaryNodeChanged(primaryNodeState, lifecycleState); + } + + @Override + public void notifyPrimaryNodeStateChange(final ControllerServiceNode service, final PrimaryNodeState primaryNodeState) { + service.notifyPrimaryNodeChanged(primaryNodeState); + } + @Override public void onProcessorRemoved(final ProcessorNode procNode) { lifecycleStates.remove(procNode); diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java index b47b43a483..faf6e4ee9a 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java @@ -19,6 +19,7 @@ package org.apache.nifi.controller.scheduling; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.validation.ValidationStatus; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Funnel; @@ -316,4 +317,16 @@ public class StatelessProcessScheduler implements ProcessScheduler { public Future submitFrameworkTask(final Runnable task) { return null; } + + @Override + public void notifyPrimaryNodeStateChange(final ProcessorNode processor, final PrimaryNodeState primaryNodeState) { + } + + @Override + public void notifyPrimaryNodeStateChange(final ControllerServiceNode service, final PrimaryNodeState primaryNodeState) { + } + + @Override + public void notifyPrimaryNodeStateChange(final ReportingTaskNode taskNode, final PrimaryNodeState primaryNodeState) { + } } diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/CountPrimaryNodeChangeEvents.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/CountPrimaryNodeChangeEvents.java new file mode 100644 index 0000000000..4040ca1aa9 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/CountPrimaryNodeChangeEvents.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.tests.system; + +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.configuration.DefaultSchedule; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +@TriggerWhenEmpty +@DefaultSchedule(period="100 millis") +public class CountPrimaryNodeChangeEvents extends AbstractSessionFactoryProcessor { + private static final String nodeNumber = System.getProperty("nodeNumber"); + + static final PropertyDescriptor EVENT_SLEEP_DURATION = new PropertyDescriptor.Builder() + .name("Event Sleep Duration") + .displayName("Event Sleep Duration") + .description("The amount of time to sleep when the onPrimaryNodeChange event occurs") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("0 sec") + .build(); + + @Override + protected List getSupportedPropertyDescriptors() { + return Collections.singletonList(EVENT_SLEEP_DURATION); + } + + private final AtomicReference sessionReference = new AtomicReference<>(); + private volatile long sleepMillis = 0L; + + @OnPrimaryNodeStateChange + public void onPrimaryNodeChange() { + final ProcessSession session = sessionReference.get(); + if (session == null) { + return; + } + + session.adjustCounter("PrimaryNodeChangeCalled-" + nodeNumber, 1L, true); + + try { + Thread.sleep(sleepMillis); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + + session.adjustCounter("PrimaryNodeChangeCompleted-" + nodeNumber, 1L, true); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + final ProcessSession current = sessionReference.get(); + if (current == null) { + final ProcessSession session = sessionFactory.createSession(); + sessionReference.compareAndSet(null, session); + } + + sleepMillis = context.getProperty(EVENT_SLEEP_DURATION).asTimePeriod(TimeUnit.MILLISECONDS); + sessionReference.get().adjustCounter("Triggers-" + nodeNumber, 1L, true); + } +} diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 83e1f6a3fd..a7919d8331 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -31,6 +31,7 @@ org.apache.nifi.processors.tests.system.LoopFlowFile org.apache.nifi.processors.tests.system.PartitionText org.apache.nifi.processors.tests.system.PassThrough org.apache.nifi.processors.tests.system.PassThroughRequiresInstanceClassLoading +org.apache.nifi.processors.tests.system.CountPrimaryNodeChangeEvents org.apache.nifi.processors.tests.system.ReplaceWithFile org.apache.nifi.processors.tests.system.ReverseContents org.apache.nifi.processors.tests.system.RoundRobinFlowFiles diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java index bacb480a5a..5ccd54ae5c 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java @@ -213,23 +213,24 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { final long maxTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60); while (true) { + int connectedNodeCount = -1; try { final ClusteSummaryEntity clusterSummary = client.getFlowClient().getClusterSummary(); - final int connectedNodeCount = clusterSummary.getClusterSummary().getConnectedNodeCount(); + connectedNodeCount = clusterSummary.getClusterSummary().getConnectedNodeCount(); if (connectedNodeCount == expectedNumberOfNodes) { logger.info("Wait successful, {} nodes connected", expectedNumberOfNodes); return; } logEverySecond("Waiting for {} nodes to connect but currently only {} nodes are connected", expectedNumberOfNodes, connectedNodeCount); - - if (System.currentTimeMillis() > maxTime) { - throw new RuntimeException("Waited up to 60 seconds for both nodes to connect but only " + connectedNodeCount + " nodes connected"); - } } catch (final Exception e) { e.printStackTrace(); } + if (System.currentTimeMillis() > maxTime) { + throw new RuntimeException("Waited up to 60 seconds for both nodes to connect but only " + connectedNodeCount + " nodes connected"); + } + try { Thread.sleep(sleepMillis); } catch (final InterruptedException ie) { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/PrimaryNodeChangeNotificationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/PrimaryNodeChangeNotificationIT.java new file mode 100644 index 0000000000..d8e75251f8 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/PrimaryNodeChangeNotificationIT.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.clustering; + +import org.apache.nifi.cluster.coordination.node.ClusterRoles; +import org.apache.nifi.tests.system.NiFiInstanceFactory; +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient; +import org.apache.nifi.web.api.dto.NodeDTO; +import org.apache.nifi.web.api.entity.ClusterEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class PrimaryNodeChangeNotificationIT extends NiFiSystemIT { + @Override + public NiFiInstanceFactory getInstanceFactory() { + return createTwoNodeInstanceFactory(); + } + + @Test + public void testNotifications() throws NiFiClientException, IOException, InterruptedException { + final ProcessorEntity processor = getClientUtil().createProcessor("CountPrimaryNodeChangeEvents"); + getClientUtil().startProcessor(processor); + + // Wait for processor to be triggered on both nodes + Map counters = new HashMap<>(); + while (!counters.containsKey("Triggers-1") || !counters.containsKey("Triggers-2")) { + Thread.sleep(10L); + counters = getClientUtil().getCountersAsMap(processor.getId()); + } + + final NodeDTO primaryNode = getNode(ClusterRoles.PRIMARY_NODE, false); + final NodeDTO nonPrimaryNode = getNode(ClusterRoles.PRIMARY_NODE, true); + + getClientUtil().disconnectNode(primaryNode.getNodeId()); + setupClient(nonPrimaryNode.getApiPort()); + waitForNodeStatus(primaryNode, "DISCONNECTED"); + + getClientUtil().connectNode(primaryNode.getNodeId()); + waitForAllNodesConnected(); + + waitFor(() -> { + final Map counterMap = getClientUtil().getCountersAsMap(processor.getId()); + final Long notificationCalledNode1 = counterMap.get("PrimaryNodeChangeCalled-1"); + final Long notificationCalledNode2 = counterMap.get("PrimaryNodeChangeCalled-2"); + final Long notificationCompletedNode1 = counterMap.get("PrimaryNodeChangeCompleted-1"); + final Long notificationCompletedNode2 = counterMap.get("PrimaryNodeChangeCompleted-2"); + + return notificationCalledNode1 > 0 && notificationCalledNode2 > 0 && notificationCompletedNode1 > 0 && notificationCompletedNode2 > 0; + }); + } + + @Test + public void testTerminateNotificationWhenBlocked() throws NiFiClientException, IOException, InterruptedException { + final ProcessorEntity processor = getClientUtil().createProcessor("CountPrimaryNodeChangeEvents"); + + // Set the event sleep duration to 10 minutes so that the processor will be blocked for a while when the primary node changes + getClientUtil().updateProcessorProperties(processor, Collections.singletonMap("Event Sleep Duration", "10 mins")); + getClientUtil().startProcessor(processor); + + // Wait for processor to be triggered on both nodes + Map counters = new HashMap<>(); + while (!counters.containsKey("Triggers-1") || !counters.containsKey("Triggers-2")) { + Thread.sleep(10L); + counters = getClientUtil().getCountersAsMap(processor.getId()); + } + + final NodeDTO primaryNode = getNode(ClusterRoles.PRIMARY_NODE, false); + final NodeDTO nonPrimaryNode = getNode(ClusterRoles.PRIMARY_NODE, true); + + getClientUtil().disconnectNode(primaryNode.getNodeId()); + setupClient(nonPrimaryNode.getApiPort()); + waitForNodeStatus(primaryNode, "DISCONNECTED"); + + getClientUtil().connectNode(primaryNode.getNodeId()); + waitForAllNodesConnected(); + + // Wait until the "called" counter is incremented but hte ChangeCompleted counter is not + waitFor(() -> { + final Map counterMap = getClientUtil().getCountersAsMap(processor.getId()); + final Long notificationCalledNode1 = counterMap.get("PrimaryNodeChangeCalled-1"); + final Long notificationCalledNode2 = counterMap.get("PrimaryNodeChangeCalled-2"); + final Long notificationCompletedNode1 = counterMap.get("PrimaryNodeChangeCompleted-1"); + final Long notificationCompletedNode2 = counterMap.get("PrimaryNodeChangeCompleted-2"); + + return notificationCalledNode1 > 0 && notificationCalledNode2 > 0 && notificationCompletedNode1 == null && notificationCompletedNode2 == null; + }); + + // wait 1 second and check again to make sure the ChangeCompleted counter is still not incremented + Thread.sleep(1000L); + + waitFor(() -> { + final Map counterMap = getClientUtil().getCountersAsMap(processor.getId()); + final Long notificationCompletedNode1 = counterMap.get("PrimaryNodeChangeCompleted-1"); + final Long notificationCompletedNode2 = counterMap.get("PrimaryNodeChangeCompleted-2"); + + return notificationCompletedNode1 == null && notificationCompletedNode2 == null; + }); + + final ProcessorClient processorClient = getNifiClient().getProcessorClient(); + assertTrue(processorClient.getProcessor(processor.getId()).getStatus().getAggregateSnapshot().getActiveThreadCount() > 0); + processorClient.stopProcessor(processor); + + // Wait a bit and make sure we still see a thread + Thread.sleep(1000L); + assertTrue(processorClient.getProcessor(processor.getId()).getStatus().getAggregateSnapshot().getActiveThreadCount() > 0); + + // Terminate the processor + processorClient.terminateProcessor(processor.getId()); + + // Wait for no threads to be active + waitFor(() -> processorClient.getProcessor(processor.getId()).getStatus().getAggregateSnapshot().getActiveThreadCount() == 0); + } + + + private NodeDTO getNode(final String roleName, final boolean invert) throws InterruptedException, NiFiClientException, IOException { + while (true) { + final ClusterEntity clusterEntity = getNifiClient().getControllerClient().getNodes(); + final Optional optionalPrimaryNodeDto = clusterEntity.getCluster().getNodes().stream() + .filter(node -> invert != node.getRoles().contains(roleName)) + .findFirst(); + + if (optionalPrimaryNodeDto.isPresent()) { + return optionalPrimaryNodeDto.get(); + } + + Thread.sleep(100L); + } + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java index bcd22ed046..917829e97d 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessorClient.java @@ -288,5 +288,4 @@ public class JerseyProcessorClient extends AbstractJerseyClient implements Proce return getRequestBuilder(target).delete(ProcessorEntity.class); }); } - }