From be0949570a66f672e128ac97c936df546c7d2521 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 15 Nov 2018 14:26:36 -0500 Subject: [PATCH] NIFI-5824: Added unit test to FlowController to ensure that the ProcessScheduler that it creates is properly initialized. Also updated the properties file used by TestFlowController to use a VolatileContentRepository instead of FileSystemRepository, and fixed EventDrivenWorkerQueue to return if calls to poll() are interrupted (via Thread.interrupt) - making these minor fixes resulted in the unit test TestFlowController running in 2 seconds instead of 30+ seconds on my machine This closes #3173. Signed-off-by: Bryan Bende --- .../controller/EventDrivenWorkerQueue.java | 12 ++++++----- .../scheduling/StandardProcessScheduler.java | 9 ++++++-- .../nifi/controller/TestFlowController.java | 21 +++++++++++++++++++ .../flowcontrollertest.nifi.properties | 3 +++ 4 files changed, 38 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java index f36a459514..25e8a86f11 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java @@ -16,6 +16,11 @@ */ package org.apache.nifi.controller; +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.util.Connectables; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -25,11 +30,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.connectable.Connection; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.util.Connectables; - public class EventDrivenWorkerQueue implements WorkerQueue { private final Object workMonitor = new Object(); @@ -69,6 +69,8 @@ public class EventDrivenWorkerQueue implements WorkerQueue { try { workMonitor.wait(timeLeft); } catch (final InterruptedException ignored) { + Thread.currentThread().interrupt(); + return null; } } else { // Decrement the amount of work there is to do for this worker. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 2902f6a11c..2ff3307533 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -35,6 +35,7 @@ import org.apache.nifi.controller.SchedulingAgentCallback; import org.apache.nifi.controller.StandardProcessorNode; import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.logging.ComponentLog; @@ -103,6 +104,10 @@ public final class StandardProcessScheduler implements ProcessScheduler { frameworkTaskExecutor = new FlowEngine(4, "Framework Task Thread"); } + public ControllerServiceProvider getControllerServiceProvider() { + return flowController.getControllerServiceProvider(); + } + private StateManager getStateManager(final String componentId) { return stateManagerProvider.getStateManager(componentId); } @@ -293,7 +298,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { public synchronized CompletableFuture startProcessor(final ProcessorNode procNode, final boolean failIfStopping) { final LifecycleState lifecycleState = getLifecycleState(requireNonNull(procNode), true); - final StandardProcessContext processContext = new StandardProcessContext(procNode, flowController.getControllerServiceProvider(), + final StandardProcessContext processContext = new StandardProcessContext(procNode, getControllerServiceProvider(), this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated); final CompletableFuture future = new CompletableFuture<>(); @@ -333,7 +338,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { public synchronized CompletableFuture stopProcessor(final ProcessorNode procNode) { final LifecycleState lifecycleState = getLifecycleState(procNode, false); - StandardProcessContext processContext = new StandardProcessContext(procNode, flowController.getControllerServiceProvider(), + StandardProcessContext processContext = new StandardProcessContext(procNode, getControllerServiceProvider(), this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated); LOG.info("Stopping {}", procNode); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java index e3c91b29e0..651ce9c5f0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java @@ -34,8 +34,10 @@ import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.repository.FlowFileEventRepository; +import org.apache.nifi.controller.scheduling.StandardProcessScheduler; import org.apache.nifi.controller.serialization.FlowSynchronizer; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.mock.DummyProcessor; import org.apache.nifi.controller.service.mock.DummyReportingTask; import org.apache.nifi.controller.service.mock.ServiceA; @@ -95,6 +97,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -126,6 +129,7 @@ public class TestFlowController { return StringEncryptor.createEncryptor(algorithm, provider, password); } + @Before public void setup() { @@ -337,6 +341,23 @@ public class TestFlowController { controller.synchronize(standardFlowSynchronizer, proposedDataFlow); } + /** + * StandardProcessScheduler is created by FlowController. The StandardProcessScheduler needs access to the Controller Service Provider, + * but the Controller Service Provider needs the ProcessScheduler in its constructor. So the StandardProcessScheduler obtains the Controller Service + * Provider by making a call back to FlowController.getControllerServiceProvider. This test exists to ensure that we always have access to the + * Controller Service Provider in the Process Scheduler, and that we don't inadvertently start storing away the result of calling + * FlowController.getControllerServiceProvider() before the service provider has been fully initialized. + */ + @Test + public void testProcessSchedulerHasAccessToControllerServiceProvider() { + final StandardProcessScheduler scheduler = controller.getProcessScheduler(); + assertNotNull(scheduler); + + final ControllerServiceProvider serviceProvider = scheduler.getControllerServiceProvider(); + assertNotNull(serviceProvider); + assertSame(serviceProvider, controller.getControllerServiceProvider()); + } + @Test public void testSynchronizeFlowWhenCurrentAuthorizationsAreEmptyAndProposedAreNot() { final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer( diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties index d9aa4d27c3..a4c1a4a6ca 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties @@ -48,6 +48,9 @@ nifi.swap.out.period=5 sec nifi.swap.out.threads=4 # Content Repository +nifi.content.repository.implementation=org.apache.nifi.controller.repository.VolatileContentRepository +nifi.volatile.content.repository.max.size=1 KB +nifi.volatile.content.repository.block.size=1 KB nifi.content.claim.max.appendable.size=10 MB nifi.content.claim.max.flow.files=100 nifi.content.repository.directory.default=./target/flowcontrollertest/content_repository