NIFI-5824: Added unit test to FlowController to ensure that the ProcessScheduler that it creates is properly initialized. Also updated the properties file used by TestFlowController to use a VolatileContentRepository instead of FileSystemRepository, and fixed EventDrivenWorkerQueue to return if calls to poll() are interrupted (via Thread.interrupt) - making these minor fixes resulted in the unit test TestFlowController running in 2 seconds instead of 30+ seconds on my machine

This closes #3173.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2018-11-15 14:26:36 -05:00 committed by Bryan Bende
parent 76b0065a67
commit be0949570a
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
4 changed files with 38 additions and 7 deletions

View File

@ -16,6 +16,11 @@
*/
package org.apache.nifi.controller;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.util.Connectables;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@ -25,11 +30,6 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.util.Connectables;
public class EventDrivenWorkerQueue implements WorkerQueue {
private final Object workMonitor = new Object();
@ -69,6 +69,8 @@ public class EventDrivenWorkerQueue implements WorkerQueue {
try {
workMonitor.wait(timeLeft);
} catch (final InterruptedException ignored) {
Thread.currentThread().interrupt();
return null;
}
} else {
// Decrement the amount of work there is to do for this worker.

View File

@ -35,6 +35,7 @@ import org.apache.nifi.controller.SchedulingAgentCallback;
import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.logging.ComponentLog;
@ -103,6 +104,10 @@ public final class StandardProcessScheduler implements ProcessScheduler {
frameworkTaskExecutor = new FlowEngine(4, "Framework Task Thread");
}
public ControllerServiceProvider getControllerServiceProvider() {
return flowController.getControllerServiceProvider();
}
private StateManager getStateManager(final String componentId) {
return stateManagerProvider.getStateManager(componentId);
}
@ -293,7 +298,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
public synchronized CompletableFuture<Void> startProcessor(final ProcessorNode procNode, final boolean failIfStopping) {
final LifecycleState lifecycleState = getLifecycleState(requireNonNull(procNode), true);
final StandardProcessContext processContext = new StandardProcessContext(procNode, flowController.getControllerServiceProvider(),
final StandardProcessContext processContext = new StandardProcessContext(procNode, getControllerServiceProvider(),
this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated);
final CompletableFuture<Void> future = new CompletableFuture<>();
@ -333,7 +338,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
public synchronized CompletableFuture<Void> stopProcessor(final ProcessorNode procNode) {
final LifecycleState lifecycleState = getLifecycleState(procNode, false);
StandardProcessContext processContext = new StandardProcessContext(procNode, flowController.getControllerServiceProvider(),
StandardProcessContext processContext = new StandardProcessContext(procNode, getControllerServiceProvider(),
this.encryptor, getStateManager(procNode.getIdentifier()), lifecycleState::isTerminated);
LOG.info("Stopping {}", procNode);

View File

@ -34,8 +34,10 @@ import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.mock.DummyProcessor;
import org.apache.nifi.controller.service.mock.DummyReportingTask;
import org.apache.nifi.controller.service.mock.ServiceA;
@ -95,6 +97,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -126,6 +129,7 @@ public class TestFlowController {
return StringEncryptor.createEncryptor(algorithm, provider, password);
}
@Before
public void setup() {
@ -337,6 +341,23 @@ public class TestFlowController {
controller.synchronize(standardFlowSynchronizer, proposedDataFlow);
}
/**
* StandardProcessScheduler is created by FlowController. The StandardProcessScheduler needs access to the Controller Service Provider,
* but the Controller Service Provider needs the ProcessScheduler in its constructor. So the StandardProcessScheduler obtains the Controller Service
* Provider by making a call back to FlowController.getControllerServiceProvider. This test exists to ensure that we always have access to the
* Controller Service Provider in the Process Scheduler, and that we don't inadvertently start storing away the result of calling
* FlowController.getControllerServiceProvider() before the service provider has been fully initialized.
*/
@Test
public void testProcessSchedulerHasAccessToControllerServiceProvider() {
final StandardProcessScheduler scheduler = controller.getProcessScheduler();
assertNotNull(scheduler);
final ControllerServiceProvider serviceProvider = scheduler.getControllerServiceProvider();
assertNotNull(serviceProvider);
assertSame(serviceProvider, controller.getControllerServiceProvider());
}
@Test
public void testSynchronizeFlowWhenCurrentAuthorizationsAreEmptyAndProposedAreNot() {
final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(

View File

@ -48,6 +48,9 @@ nifi.swap.out.period=5 sec
nifi.swap.out.threads=4
# Content Repository
nifi.content.repository.implementation=org.apache.nifi.controller.repository.VolatileContentRepository
nifi.volatile.content.repository.max.size=1 KB
nifi.volatile.content.repository.block.size=1 KB
nifi.content.claim.max.appendable.size=10 MB
nifi.content.claim.max.flow.files=100
nifi.content.repository.directory.default=./target/flowcontrollertest/content_repository