NIFI-11290 Run Component Primary Node State changes in background thread

- Ensure that components are notified that primary node has changed in a background thread instead of the Leader Election thread and activate/deactivate the thread in the case of Processors so that they can be viewed in the UI and terminated

- Fixed system tests that would fail intermittently because they did not wait for node disconnection to complete and did not properly switch the client to look at the connected node before checking cluster status

This closes #7052

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2023-03-15 13:07:59 -04:00 committed by exceptionfactory
parent 9de93f19a4
commit 016d58612c
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
17 changed files with 368 additions and 21 deletions

View File

@ -30,6 +30,8 @@ import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
@ -90,6 +92,7 @@ import org.slf4j.LoggerFactory;
import java.lang.management.ThreadInfo;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
@ -1598,6 +1601,24 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
}
}
@Override
public void notifyPrimaryNodeChanged(final PrimaryNodeState nodeState, final LifecycleState lifecycleState) {
final Class<?> implClass = getProcessor().getClass();
final List<Method> methods = ReflectionUtils.findMethodsWithAnnotations(implClass, new Class[] {OnPrimaryNodeStateChange.class});
if (methods.isEmpty()) {
return;
}
lifecycleState.incrementActiveThreadCount(null);
activateThread();
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), implClass, getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, getProcessor(), nodeState);
} finally {
deactivateThread();
lifecycleState.decrementActiveThreadCount();
}
}
private void activateThread() {
final Thread thread = Thread.currentThread();
final Long timestamp = System.currentTimeMillis();

View File

@ -17,6 +17,8 @@
package org.apache.nifi.controller.reporting;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigVerificationResult;
@ -36,6 +38,7 @@ import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.TerminationAwareLogger;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
@ -50,10 +53,12 @@ import org.apache.nifi.reporting.VerifiableReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.CharacterFilterUtils;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
@ -395,4 +400,23 @@ public abstract class AbstractReportingTaskNode extends AbstractComponentNode im
return results;
}
@Override
public void notifyPrimaryNodeChanged(final PrimaryNodeState nodeState, final LifecycleState lifecycleState) {
final Class<?> taskClass = getReportingTask().getClass();
final List<Method> methods = ReflectionUtils.findMethodsWithAnnotations(taskClass, new Class[] {OnPrimaryNodeStateChange.class});
if (methods.isEmpty()) {
return;
}
lifecycleState.incrementActiveThreadCount(null);
try {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), taskClass, getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, getReportingTask(), nodeState);
}
} finally {
lifecycleState.decrementActiveThreadCount();
}
}
}

View File

@ -21,6 +21,8 @@ import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
@ -66,6 +68,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
@ -772,4 +775,17 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
this.bulletinLevel = level;
}
@Override
public void notifyPrimaryNodeChanged(final PrimaryNodeState nodeState) {
final Class<?> implementationClass = getControllerServiceImplementation().getClass();
final List<Method> methods = ReflectionUtils.findMethodsWithAnnotations(implementationClass, new Class[] {OnPrimaryNodeStateChange.class});
if (methods.isEmpty()) {
return;
}
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), implementationClass, getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, getControllerServiceImplementation(), nodeState);
}
}
}

View File

@ -159,7 +159,7 @@ public class ReflectionUtils {
return isSuccess;
}
private static List<Method> findMethodsWithAnnotations(final Class<?> clazz, final Class<? extends Annotation>[] annotationClasses) {
public static List<Method> findMethodsWithAnnotations(final Class<?> clazz, final Class<? extends Annotation>[] annotationClasses) {
// We use a cache here to store a mapping of Class & Annotation[] to those methods that contain the annotation.
// This is done because discovering this using Reflection is fairly expensive (can take up to tens of milliseconds on laptop).
// While this may not seem like much time, consider deleting a Process Group with thousands of Processors or instantiating

View File

@ -1393,4 +1393,5 @@ public abstract class AbstractComponentNode implements ComponentNode {
public boolean isReferencingParameter(final String parameterName) {
return parameterReferenceCounts.containsKey(parameterName);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.controller;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
@ -260,4 +261,11 @@ public interface ProcessScheduler {
* @param task the task to perform
*/
Future<?> submitFrameworkTask(Runnable task);
void notifyPrimaryNodeStateChange(ProcessorNode processor, PrimaryNodeState primaryNodeState);
void notifyPrimaryNodeStateChange(ControllerServiceNode service, PrimaryNodeState primaryNodeState);
void notifyPrimaryNodeStateChange(ReportingTaskNode taskNode, PrimaryNodeState primaryNodeState);
}

View File

@ -17,6 +17,8 @@
package org.apache.nifi.controller;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.connectable.Connectable;
@ -27,7 +29,6 @@ import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
@ -300,4 +301,6 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con
* @param context The ProcessContext associated with the Processor configuration
*/
public abstract void onConfigurationRestored(ProcessContext context);
public abstract void notifyPrimaryNodeChanged(PrimaryNodeState primaryNodeState, LifecycleState lifecycleState);
}

View File

@ -16,10 +16,12 @@
*/
package org.apache.nifi.controller;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
@ -132,4 +134,7 @@ public interface ReportingTaskNode extends ComponentNode {
void enable();
void disable();
void notifyPrimaryNodeChanged(PrimaryNodeState primaryNodeState, LifecycleState lifecycleState);
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.controller.service;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.VersionedComponent;
import org.apache.nifi.controller.ComponentNode;
@ -26,7 +28,6 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.components.ConfigVerificationResult;
import java.util.List;
import java.util.Map;
@ -241,4 +242,6 @@ public interface ControllerServiceNode extends ComponentNode, VersionedComponent
final LoggableComponent<ControllerService> proxiedControllerService,
final ControllerServiceInvocationHandler invocationHandler);
void notifyPrimaryNodeChanged(PrimaryNodeState primaryNodeState);
}

View File

@ -19,7 +19,6 @@ package org.apache.nifi.controller;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.Resource;
@ -2522,21 +2521,15 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
public void setPrimary(final boolean primary) {
final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
final ProcessGroup rootGroup = flowManager.getRootGroup();
for (final ProcessorNode procNode : rootGroup.findAllProcessors()) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState);
}
processScheduler.submitFrameworkTask(() -> processScheduler.notifyPrimaryNodeStateChange(procNode, nodeState) );
}
for (final ControllerServiceNode serviceNode : flowManager.getAllControllerServices()) {
final Class<?> serviceImplClass = serviceNode.getControllerServiceImplementation().getClass();
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, serviceImplClass, serviceNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState);
}
processScheduler.submitFrameworkTask(() -> processScheduler.notifyPrimaryNodeStateChange(serviceNode, nodeState) );
}
for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, reportingTaskNode.getReportingTask().getClass(), reportingTaskNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState);
}
processScheduler.submitFrameworkTask(() -> processScheduler.notifyPrimaryNodeStateChange(reportingTaskNode, nodeState) );
}
// update primary

View File

@ -20,6 +20,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.validation.ValidationStatus;
@ -451,6 +452,23 @@ public final class StandardProcessScheduler implements ProcessScheduler {
LOG.info("Successfully terminated {} with {} active threads", procNode, tasksTerminated);
}
@Override
public void notifyPrimaryNodeStateChange(final ProcessorNode processor, final PrimaryNodeState primaryNodeState) {
final LifecycleState lifecycleState = getLifecycleState(processor, false);
processor.notifyPrimaryNodeChanged(primaryNodeState, lifecycleState);
}
@Override
public void notifyPrimaryNodeStateChange(final ReportingTaskNode taskNode, final PrimaryNodeState primaryNodeState) {
final LifecycleState lifecycleState = getLifecycleState(taskNode, false);
taskNode.notifyPrimaryNodeChanged(primaryNodeState, lifecycleState);
}
@Override
public void notifyPrimaryNodeStateChange(final ControllerServiceNode service, final PrimaryNodeState primaryNodeState) {
service.notifyPrimaryNodeChanged(primaryNodeState);
}
@Override
public void onProcessorRemoved(final ProcessorNode procNode) {
lifecycleStates.remove(procNode);

View File

@ -19,6 +19,7 @@ package org.apache.nifi.controller.scheduling;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Funnel;
@ -316,4 +317,16 @@ public class StatelessProcessScheduler implements ProcessScheduler {
public Future<?> submitFrameworkTask(final Runnable task) {
return null;
}
@Override
public void notifyPrimaryNodeStateChange(final ProcessorNode processor, final PrimaryNodeState primaryNodeState) {
}
@Override
public void notifyPrimaryNodeStateChange(final ControllerServiceNode service, final PrimaryNodeState primaryNodeState) {
}
@Override
public void notifyPrimaryNodeStateChange(final ReportingTaskNode taskNode, final PrimaryNodeState primaryNodeState) {
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.tests.system;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@TriggerWhenEmpty
@DefaultSchedule(period="100 millis")
public class CountPrimaryNodeChangeEvents extends AbstractSessionFactoryProcessor {
private static final String nodeNumber = System.getProperty("nodeNumber");
static final PropertyDescriptor EVENT_SLEEP_DURATION = new PropertyDescriptor.Builder()
.name("Event Sleep Duration")
.displayName("Event Sleep Duration")
.description("The amount of time to sleep when the onPrimaryNodeChange event occurs")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("0 sec")
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.singletonList(EVENT_SLEEP_DURATION);
}
private final AtomicReference<ProcessSession> sessionReference = new AtomicReference<>();
private volatile long sleepMillis = 0L;
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange() {
final ProcessSession session = sessionReference.get();
if (session == null) {
return;
}
session.adjustCounter("PrimaryNodeChangeCalled-" + nodeNumber, 1L, true);
try {
Thread.sleep(sleepMillis);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
session.adjustCounter("PrimaryNodeChangeCompleted-" + nodeNumber, 1L, true);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
final ProcessSession current = sessionReference.get();
if (current == null) {
final ProcessSession session = sessionFactory.createSession();
sessionReference.compareAndSet(null, session);
}
sleepMillis = context.getProperty(EVENT_SLEEP_DURATION).asTimePeriod(TimeUnit.MILLISECONDS);
sessionReference.get().adjustCounter("Triggers-" + nodeNumber, 1L, true);
}
}

View File

@ -31,6 +31,7 @@ org.apache.nifi.processors.tests.system.LoopFlowFile
org.apache.nifi.processors.tests.system.PartitionText
org.apache.nifi.processors.tests.system.PassThrough
org.apache.nifi.processors.tests.system.PassThroughRequiresInstanceClassLoading
org.apache.nifi.processors.tests.system.CountPrimaryNodeChangeEvents
org.apache.nifi.processors.tests.system.ReplaceWithFile
org.apache.nifi.processors.tests.system.ReverseContents
org.apache.nifi.processors.tests.system.RoundRobinFlowFiles

View File

@ -213,22 +213,23 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
final long maxTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60);
while (true) {
int connectedNodeCount = -1;
try {
final ClusteSummaryEntity clusterSummary = client.getFlowClient().getClusterSummary();
final int connectedNodeCount = clusterSummary.getClusterSummary().getConnectedNodeCount();
connectedNodeCount = clusterSummary.getClusterSummary().getConnectedNodeCount();
if (connectedNodeCount == expectedNumberOfNodes) {
logger.info("Wait successful, {} nodes connected", expectedNumberOfNodes);
return;
}
logEverySecond("Waiting for {} nodes to connect but currently only {} nodes are connected", expectedNumberOfNodes, connectedNodeCount);
} catch (final Exception e) {
e.printStackTrace();
}
if (System.currentTimeMillis() > maxTime) {
throw new RuntimeException("Waited up to 60 seconds for both nodes to connect but only " + connectedNodeCount + " nodes connected");
}
} catch (final Exception e) {
e.printStackTrace();
}
try {
Thread.sleep(sleepMillis);

View File

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.clustering;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
import org.apache.nifi.tests.system.NiFiInstanceFactory;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.entity.ClusterEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class PrimaryNodeChangeNotificationIT extends NiFiSystemIT {
@Override
public NiFiInstanceFactory getInstanceFactory() {
return createTwoNodeInstanceFactory();
}
@Test
public void testNotifications() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity processor = getClientUtil().createProcessor("CountPrimaryNodeChangeEvents");
getClientUtil().startProcessor(processor);
// Wait for processor to be triggered on both nodes
Map<String, Long> counters = new HashMap<>();
while (!counters.containsKey("Triggers-1") || !counters.containsKey("Triggers-2")) {
Thread.sleep(10L);
counters = getClientUtil().getCountersAsMap(processor.getId());
}
final NodeDTO primaryNode = getNode(ClusterRoles.PRIMARY_NODE, false);
final NodeDTO nonPrimaryNode = getNode(ClusterRoles.PRIMARY_NODE, true);
getClientUtil().disconnectNode(primaryNode.getNodeId());
setupClient(nonPrimaryNode.getApiPort());
waitForNodeStatus(primaryNode, "DISCONNECTED");
getClientUtil().connectNode(primaryNode.getNodeId());
waitForAllNodesConnected();
waitFor(() -> {
final Map<String, Long> counterMap = getClientUtil().getCountersAsMap(processor.getId());
final Long notificationCalledNode1 = counterMap.get("PrimaryNodeChangeCalled-1");
final Long notificationCalledNode2 = counterMap.get("PrimaryNodeChangeCalled-2");
final Long notificationCompletedNode1 = counterMap.get("PrimaryNodeChangeCompleted-1");
final Long notificationCompletedNode2 = counterMap.get("PrimaryNodeChangeCompleted-2");
return notificationCalledNode1 > 0 && notificationCalledNode2 > 0 && notificationCompletedNode1 > 0 && notificationCompletedNode2 > 0;
});
}
@Test
public void testTerminateNotificationWhenBlocked() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity processor = getClientUtil().createProcessor("CountPrimaryNodeChangeEvents");
// Set the event sleep duration to 10 minutes so that the processor will be blocked for a while when the primary node changes
getClientUtil().updateProcessorProperties(processor, Collections.singletonMap("Event Sleep Duration", "10 mins"));
getClientUtil().startProcessor(processor);
// Wait for processor to be triggered on both nodes
Map<String, Long> counters = new HashMap<>();
while (!counters.containsKey("Triggers-1") || !counters.containsKey("Triggers-2")) {
Thread.sleep(10L);
counters = getClientUtil().getCountersAsMap(processor.getId());
}
final NodeDTO primaryNode = getNode(ClusterRoles.PRIMARY_NODE, false);
final NodeDTO nonPrimaryNode = getNode(ClusterRoles.PRIMARY_NODE, true);
getClientUtil().disconnectNode(primaryNode.getNodeId());
setupClient(nonPrimaryNode.getApiPort());
waitForNodeStatus(primaryNode, "DISCONNECTED");
getClientUtil().connectNode(primaryNode.getNodeId());
waitForAllNodesConnected();
// Wait until the "called" counter is incremented but hte ChangeCompleted counter is not
waitFor(() -> {
final Map<String, Long> counterMap = getClientUtil().getCountersAsMap(processor.getId());
final Long notificationCalledNode1 = counterMap.get("PrimaryNodeChangeCalled-1");
final Long notificationCalledNode2 = counterMap.get("PrimaryNodeChangeCalled-2");
final Long notificationCompletedNode1 = counterMap.get("PrimaryNodeChangeCompleted-1");
final Long notificationCompletedNode2 = counterMap.get("PrimaryNodeChangeCompleted-2");
return notificationCalledNode1 > 0 && notificationCalledNode2 > 0 && notificationCompletedNode1 == null && notificationCompletedNode2 == null;
});
// wait 1 second and check again to make sure the ChangeCompleted counter is still not incremented
Thread.sleep(1000L);
waitFor(() -> {
final Map<String, Long> counterMap = getClientUtil().getCountersAsMap(processor.getId());
final Long notificationCompletedNode1 = counterMap.get("PrimaryNodeChangeCompleted-1");
final Long notificationCompletedNode2 = counterMap.get("PrimaryNodeChangeCompleted-2");
return notificationCompletedNode1 == null && notificationCompletedNode2 == null;
});
final ProcessorClient processorClient = getNifiClient().getProcessorClient();
assertTrue(processorClient.getProcessor(processor.getId()).getStatus().getAggregateSnapshot().getActiveThreadCount() > 0);
processorClient.stopProcessor(processor);
// Wait a bit and make sure we still see a thread
Thread.sleep(1000L);
assertTrue(processorClient.getProcessor(processor.getId()).getStatus().getAggregateSnapshot().getActiveThreadCount() > 0);
// Terminate the processor
processorClient.terminateProcessor(processor.getId());
// Wait for no threads to be active
waitFor(() -> processorClient.getProcessor(processor.getId()).getStatus().getAggregateSnapshot().getActiveThreadCount() == 0);
}
private NodeDTO getNode(final String roleName, final boolean invert) throws InterruptedException, NiFiClientException, IOException {
while (true) {
final ClusterEntity clusterEntity = getNifiClient().getControllerClient().getNodes();
final Optional<NodeDTO> optionalPrimaryNodeDto = clusterEntity.getCluster().getNodes().stream()
.filter(node -> invert != node.getRoles().contains(roleName))
.findFirst();
if (optionalPrimaryNodeDto.isPresent()) {
return optionalPrimaryNodeDto.get();
}
Thread.sleep(100L);
}
}
}

View File

@ -288,5 +288,4 @@ public class JerseyProcessorClient extends AbstractJerseyClient implements Proce
return getRequestBuilder(target).delete(ProcessorEntity.class);
});
}
}