NIFI-4772: Refactored how the @OnScheduled methods of processors is invoked/monitored. The new method does away with the two previously created 8-thread thread pools and just uses the Timer-Driven thread pool that is used by other framework tasks.

NIFI-4772: Introduced a new thread-pool with 2 threads that will be used for monitoring lifecycle task. This means that if all threads in the timer-driven thead pool are blocked by processors that don't complete their @OnScheduled methods, we have a separate thread pool that at least gives us a chance of interrupting those threads

NIFI-4772: Remove unused import
Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2403
This commit is contained in:
Mark Payne 2018-01-12 14:12:57 -05:00 committed by Matthew Burgess
parent 2a5e21c11b
commit 14fef2de14
12 changed files with 626 additions and 240 deletions

View File

@ -509,7 +509,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
throw new RuntimeException(e); throw new RuntimeException(e);
} }
processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider, this.nifiProperties); processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, encryptor, stateManagerProvider, this.nifiProperties);
eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);
final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository); final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository);
@ -1661,13 +1661,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/** /**
* Updates the number of threads that can be simultaneously used for * Updates the number of threads that can be simultaneously used for
* executing processors. * executing processors.
* This method must be called while holding the write lock!
* *
* @param maxThreadCount This method must be called while holding the write * @param maxThreadCount max number of threads
* lock!
*/ */
private void setMaxThreadCount(final int maxThreadCount, final FlowEngine engine, final AtomicInteger maxThreads) { private void setMaxThreadCount(final int maxThreadCount, final FlowEngine engine, final AtomicInteger maxThreads) {
if (maxThreadCount < 1) { if (maxThreadCount < 1) {
throw new IllegalArgumentException(); throw new IllegalArgumentException("Cannot set max number of threads to less than 2");
} }
maxThreads.getAndSet(maxThreadCount); maxThreads.getAndSet(maxThreadCount);

View File

@ -18,7 +18,6 @@ package org.apache.nifi.controller;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import java.lang.reflect.InvocationTargetException;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -32,11 +31,9 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -131,12 +128,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
private final ProcessScheduler processScheduler; private final ProcessScheduler processScheduler;
private long runNanos = 0L; private long runNanos = 0L;
private volatile long yieldNanos; private volatile long yieldNanos;
private final NiFiProperties nifiProperties;
private volatile ScheduledState desiredState; private volatile ScheduledState desiredState;
private SchedulingStrategy schedulingStrategy; // guarded by read/write lock private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
// ??????? NOT any more // ??????? NOT any more
private ExecutionNode executionNode; private ExecutionNode executionNode;
private final long onScheduleTimeoutMillis;
public StandardProcessorNode(final LoggableComponent<Processor> processor, final String uuid, public StandardProcessorNode(final LoggableComponent<Processor> processor, final String uuid,
final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
@ -176,7 +173,9 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
this.processGroup = new AtomicReference<>(); this.processGroup = new AtomicReference<>();
processScheduler = scheduler; processScheduler = scheduler;
penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD); penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD);
this.nifiProperties = nifiProperties;
final String timeoutString = nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
onScheduleTimeoutMillis = timeoutString == null ? 60000 : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS);
schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN; schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN;
executionNode = ExecutionNode.ALL; executionNode = ExecutionNode.ALL;
@ -300,6 +299,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
} }
@Override @Override
@SuppressWarnings("deprecation")
public boolean isIsolated() { public boolean isIsolated() {
return schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY || executionNode == ExecutionNode.PRIMARY; return schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY || executionNode == ExecutionNode.PRIMARY;
} }
@ -465,6 +465,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
} }
@Override @Override
@SuppressWarnings("deprecation")
public synchronized void setScheduldingPeriod(final String schedulingPeriod) { public synchronized void setScheduldingPeriod(final String schedulingPeriod) {
if (isRunning()) { if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
@ -1312,7 +1313,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
} }
if (starting) { // will ensure that the Processor represented by this node can only be started once if (starting) { // will ensure that the Processor represented by this node can only be started once
taskScheduler.execute(() -> initiateStart(taskScheduler, administrativeYieldMillis, processContext, schedulingAgentCallback)); initiateStart(taskScheduler, administrativeYieldMillis, processContext, schedulingAgentCallback);
} else { } else {
final String procName = processorRef.get().toString(); final String procName = processorRef.get().toString();
LOG.warn("Cannot start {} because it is not currently stopped. Current state is {}", procName, currentState); LOG.warn("Cannot start {} because it is not currently stopped. Current state is {}", procName, currentState);
@ -1326,40 +1327,83 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final Processor processor = getProcessor(); final Processor processor = getProcessor();
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor); final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
try { final long completionTimestamp = System.currentTimeMillis() + onScheduleTimeoutMillis;
invokeTaskAsCancelableFuture(schedulingAgentCallback, new Callable<Void>() {
@Override // Create a task to invoke the @OnScheduled annotation of the processor
public Void call() throws Exception { final Callable<Void> startupTask = () -> {
LOG.debug("Invoking @OnScheduled methods of {}", processor);
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext); ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext);
return null;
}
}
});
if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) { if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) {
LOG.debug("Successfully completed the @OnScheduled methods of {}; will now start triggering processor to run", processor);
schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle
} else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state } else {
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { LOG.debug("Successfully invoked @OnScheduled methods of {} but scheduled state is no longer STARTING so will stop processor now", processor);
// can only happen if stopProcessor was called before service was transitioned to RUNNING state
try {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
} finally {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
} }
scheduledState.set(ScheduledState.STOPPED); scheduledState.set(ScheduledState.STOPPED);
} }
} finally {
schedulingAgentCallback.onTaskComplete();
}
} catch (final Exception e) { } catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; procLog.error("Failed to properly initialize Processor. If still scheduled to run, NiFi will attempt to "
procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {} seconds", + "initialize and run the Processor again after the 'Administrative Yield Duration' has elapsed. Failure is due to " + e, e);
new Object[]{StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis / 1000L}, cause);
LOG.error("Failed to invoke @OnScheduled method due to {}", cause.toString(), cause);
// If processor's task completed Exceptionally, then we want to retry initiating the start (if Processor is still scheduled to run).
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
try {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
} finally {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
}
}
if (scheduledState.get() != ScheduledState.STOPPING) { // make sure we only continue retry loop if STOP action wasn't initiated // make sure we only continue retry loop if STOP action wasn't initiated
taskScheduler.schedule(() -> initiateStart(taskScheduler, administrativeYieldMillis, processContext, schedulingAgentCallback), administrativeYieldMillis, TimeUnit.MILLISECONDS); if (scheduledState.get() != ScheduledState.STOPPING) {
// re-initiate the entire process
final Runnable initiateStartTask = () -> initiateStart(taskScheduler, administrativeYieldMillis, processContext, schedulingAgentCallback);
taskScheduler.schedule(initiateStartTask, administrativeYieldMillis, TimeUnit.MILLISECONDS);
} else { } else {
scheduledState.set(ScheduledState.STOPPED); scheduledState.set(ScheduledState.STOPPED);
} }
} }
return null;
};
// Trigger the task in a background thread.
final Future<?> taskFuture = schedulingAgentCallback.scheduleTask(startupTask);
// Trigger a task periodically to check if @OnScheduled task completed. Once it has,
// this task will call SchedulingAgentCallback#onTaskComplete.
// However, if the task times out, we need to be able to cancel the monitoring. So, in order
// to do this, we use #scheduleWithFixedDelay and then make that Future available to the task
// itself by placing it into an AtomicReference.
final AtomicReference<Future<?>> futureRef = new AtomicReference<>();
final Runnable monitoringTask = new Runnable() {
@Override
public void run() {
Future<?> monitoringFuture = futureRef.get();
if (monitoringFuture == null) { // Future is not yet available. Just return and wait for the next invocation.
return;
}
monitorAsyncTask(taskFuture, monitoringFuture, completionTimestamp);
}
};
final Future<?> future = taskScheduler.scheduleWithFixedDelay(monitoringTask, 1, 10, TimeUnit.MILLISECONDS);
futureRef.set(future);
} }
/** /**
@ -1451,59 +1495,26 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
return future; return future;
} }
/**
* Will invoke lifecycle operation (OnScheduled or OnUnscheduled) private void monitorAsyncTask(final Future<?> taskFuture, final Future<?> monitoringFuture, final long completionTimestamp) {
* asynchronously to ensure that it could be interrupted if stop action was if (taskFuture.isDone()) {
* initiated on the processor that may be infinitely blocking in such monitoringFuture.cancel(false); // stop scheduling this task
* operation. While this approach paves the way for further enhancements } else if (System.currentTimeMillis() > completionTimestamp) {
* related to managing processor'slife-cycle operation at the moment the // Task timed out. Request an interrupt of the processor task
* interrupt will not happen automatically. This is primarily to preserve
* the existing behavior of the NiFi where stop operation can only be
* invoked once the processor is started. Unfortunately that could mean that
* the processor may be blocking indefinitely in lifecycle operation
* (OnScheduled or OnUnscheduled). To deal with that a new NiFi property has
* been introduced <i>nifi.processor.scheduling.timeout</i> which allows one
* to set the time (in milliseconds) of how long to wait before canceling
* such lifecycle operation (OnScheduled or OnUnscheduled) allowing
* processor's stop sequence to proceed. The default value for this property
* is {@link Long#MAX_VALUE}.
* <p>
* NOTE: Canceling the task does not guarantee that the task will actually
* completes (successfully or otherwise), since cancellation of the task
* will issue a simple Thread.interrupt(). However code inside of lifecycle
* operation (OnScheduled or OnUnscheduled) is written purely and will
* ignore thread interrupts you may end up with runaway thread which may
* eventually require NiFi reboot. In any event, the above explanation will
* be logged (WARN) informing a user so further actions could be taken.
* </p>
*/
private <T> void invokeTaskAsCancelableFuture(final SchedulingAgentCallback callback, final Callable<T> task) {
final Processor processor = processorRef.get().getProcessor();
final String timeoutString = nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
final long onScheduleTimeout = timeoutString == null ? 60000
: FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS);
final Future<?> taskFuture = callback.scheduleTask(task);
try {
taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
LOG.warn("Thread was interrupted while waiting for processor '" + processor.getClass().getSimpleName()
+ "' lifecycle OnScheduled operation to finish.");
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while executing one of processor's OnScheduled tasks.", e);
} catch (final TimeoutException e) {
taskFuture.cancel(true); taskFuture.cancel(true);
LOG.warn("Timed out while waiting for OnScheduled of '"
+ processor.getClass().getSimpleName() // Stop monitoring the processor. We have interrupted the thread so that's all we can do. If the processor responds to the interrupt, then
+ "' processor to finish. An attempt is made to cancel the task via Thread.interrupt(). However it does not " // it will be re-scheduled. If it does not, then it will either keep the thread indefinitely or eventually finish, at which point
// the Processor will begin running.
monitoringFuture.cancel(false);
final Processor processor = processorRef.get().getProcessor();
LOG.warn("Timed out while waiting for OnScheduled of "
+ processor + " to finish. An attempt is made to cancel the task via Thread.interrupt(). However it does not "
+ "guarantee that the task will be canceled since the code inside current OnScheduled operation may " + "guarantee that the task will be canceled since the code inside current OnScheduled operation may "
+ "have been written to ignore interrupts which may result in a runaway thread. This could lead to more issues, " + "have been written to ignore interrupts which may result in a runaway thread. This could lead to more issues, "
+ "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '" + "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '"
+ processor + "' that needs to be documented, reported and eventually fixed."); + processor + "' that needs to be documented, reported and eventually fixed.");
throw new RuntimeException("Timed out while executing one of processor's OnScheduled task.", e);
} catch (final ExecutionException e){
throw new RuntimeException("Failed while executing one of processor's OnScheduled task.", e);
} finally {
callback.onTaskComplete();
} }
} }

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.exception;
public class ComponentStartTimeoutException extends Exception {
public ComponentStartTimeoutException(String message) {
super(message);
}
}

View File

@ -77,19 +77,21 @@ public final class StandardProcessScheduler implements ProcessScheduler {
private final ConcurrentMap<Object, ScheduleState> scheduleStates = new ConcurrentHashMap<>(); private final ConcurrentMap<Object, ScheduleState> scheduleStates = new ConcurrentHashMap<>();
private final ScheduledExecutorService frameworkTaskExecutor; private final ScheduledExecutorService frameworkTaskExecutor;
private final ConcurrentMap<SchedulingStrategy, SchedulingAgent> strategyAgentMap = new ConcurrentHashMap<>(); private final ConcurrentMap<SchedulingStrategy, SchedulingAgent> strategyAgentMap = new ConcurrentHashMap<>();
// thread pool for starting/stopping components
private final ScheduledExecutorService componentLifeCycleThreadPool = new FlowEngine(8, "StandardProcessScheduler", true); // thread pool for starting/stopping components
private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(8, "StandardProcessScheduler", true); private final ScheduledExecutorService componentLifeCycleThreadPool;
private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(2, "Monitor Processore Lifecycle", true);
private final StringEncryptor encryptor; private final StringEncryptor encryptor;
public StandardProcessScheduler( public StandardProcessScheduler(
final FlowEngine componentLifecycleThreadPool,
final ControllerServiceProvider controllerServiceProvider, final ControllerServiceProvider controllerServiceProvider,
final StringEncryptor encryptor, final StringEncryptor encryptor,
final StateManagerProvider stateManagerProvider, final StateManagerProvider stateManagerProvider,
final NiFiProperties nifiProperties final NiFiProperties nifiProperties
) { ) {
this.componentLifeCycleThreadPool = componentLifecycleThreadPool;
this.controllerServiceProvider = controllerServiceProvider; this.controllerServiceProvider = controllerServiceProvider;
this.encryptor = encryptor; this.encryptor = encryptor;
this.stateManagerProvider = stateManagerProvider; this.stateManagerProvider = stateManagerProvider;
@ -164,7 +166,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
frameworkTaskExecutor.shutdown(); frameworkTaskExecutor.shutdown();
componentLifeCycleThreadPool.shutdown(); componentLifeCycleThreadPool.shutdown();
componentMonitoringThreadPool.shutdown();
} }
@Override @Override
@ -313,7 +314,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
@Override @Override
public Future<?> scheduleTask(Callable<?> task) { public Future<?> scheduleTask(Callable<?> task) {
scheduleState.incrementActiveThreadCount(); scheduleState.incrementActiveThreadCount();
return componentMonitoringThreadPool.submit(task); return componentLifeCycleThreadPool.submit(task);
} }
@Override @Override
@ -323,7 +324,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}; };
LOG.info("Starting {}", procNode); LOG.info("Starting {}", procNode);
procNode.start(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, processContext, callback, failIfStopping); procNode.start(this.componentMonitoringThreadPool, this.administrativeYieldMillis, processContext, callback, failIfStopping);
return future; return future;
} }

View File

@ -100,7 +100,7 @@ public class TestStandardProcessorNode {
final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(processor, coordinate, null); final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(processor, coordinate, null);
final StandardProcessorNode procNode = new StandardProcessorNode(loggableComponent, uuid, createValidationContextFactory(), null, null, final StandardProcessorNode procNode = new StandardProcessorNode(loggableComponent, uuid, createValidationContextFactory(), null, null,
NiFiProperties.createBasicNiFiProperties(null, null), new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent); NiFiProperties.createBasicNiFiProperties(null, null), new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent);
final ScheduledExecutorService taskScheduler = new FlowEngine(2, "TestClasspathResources", true); final ScheduledExecutorService taskScheduler = new FlowEngine(1, "TestClasspathResources", true);
final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null); final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null);
final SchedulingAgentCallback schedulingAgentCallback = new SchedulingAgentCallback() { final SchedulingAgentCallback schedulingAgentCallback = new SchedulingAgentCallback() {

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.scheduling;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
public class LongEnablingService extends AbstractControllerService {
private final AtomicInteger enableCounter = new AtomicInteger();
private final AtomicInteger disableCounter = new AtomicInteger();
private volatile long limit;
@OnEnabled
public void enable(final ConfigurationContext context) throws Exception {
this.enableCounter.incrementAndGet();
Thread.sleep(limit);
}
@OnDisabled
public void disable(final ConfigurationContext context) {
this.disableCounter.incrementAndGet();
}
public int enableInvocationCount() {
return this.enableCounter.get();
}
public int disableInvocationCount() {
return this.disableCounter.get();
}
public void setLimit(final long limit) {
this.limit = limit;
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.scheduling;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.Collections;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.SystemBundle;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class StandardProcessSchedulerIT {
private final StateManagerProvider stateMgrProvider = Mockito.mock(StateManagerProvider.class);
private VariableRegistry variableRegistry = VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY;
private FlowController controller;
private NiFiProperties nifiProperties;
private Bundle systemBundle;
private volatile String propsFile = TestStandardProcessScheduler.class.getResource("/standardprocessschedulertest.nifi.properties").getFile();
@Before
public void setup() throws InitializationException {
this.nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, null);
// load the system bundle
systemBundle = SystemBundle.create(nifiProperties);
ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet());
controller = Mockito.mock(FlowController.class);
}
/**
* Validates that the service that is currently in ENABLING state can be
* disabled and that its @OnDisabled operation will be invoked as soon as
*
* @OnEnable finishes.
*/
@Test
public void validateLongEnablingServiceCanStillBeDisabled() throws Exception {
final StandardProcessScheduler scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, stateMgrProvider, nifiProperties);
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
"1", systemBundle.getBundleDetails().getCoordinate(), null, false);
final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
ts.setLimit(3000);
scheduler.enableControllerService(serviceNode);
Thread.sleep(2000);
assertTrue(serviceNode.isActive());
assertEquals(1, ts.enableInvocationCount());
Thread.sleep(500);
scheduler.disableControllerService(serviceNode);
assertFalse(serviceNode.isActive());
assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
assertEquals(0, ts.disableInvocationCount());
// wait a bit. . . Enabling will finish and @OnDisabled will be invoked
// automatically
Thread.sleep(4000);
assertEquals(ControllerServiceState.DISABLED, serviceNode.getState());
assertEquals(1, ts.disableInvocationCount());
}
}

View File

@ -339,7 +339,7 @@ public class TestProcessorLifecycle {
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run // sets the scenario for the processor to run
int delay = 2000; int delay = 200;
this.longRunningOnSchedule(testProcessor, delay); this.longRunningOnSchedule(testProcessor, delay);
ProcessScheduler ps = fc.getProcessScheduler(); ProcessScheduler ps = fc.getProcessScheduler();
@ -348,9 +348,10 @@ public class TestProcessorLifecycle {
ps.stopProcessor(testProcNode); ps.stopProcessor(testProcNode);
assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 5000L); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 5000L);
assertCondition(() -> testProcessor.operationNames.size() == 2, 8000L); assertCondition(() -> testProcessor.operationNames.size() == 3, 8000L);
assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
assertEquals("@OnStopped", testProcessor.operationNames.get(2));
} }
/** /**
@ -442,7 +443,7 @@ public class TestProcessorLifecycle {
*/ */
@Test @Test
public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception { public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception {
final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec"); final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "1 sec");
fc = fcsb.getFlowController(); fc = fcsb.getFlowController();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
@ -462,7 +463,7 @@ public class TestProcessorLifecycle {
} }
/** /**
* Validates that processor can be stopped if onTrigger() keeps trowing * Validates that processor can be stopped if onTrigger() keeps throwing
* exceptions. * exceptions.
*/ */
@Test @Test
@ -593,7 +594,7 @@ public class TestProcessorLifecycle {
testProcNodeB.setProperties(properties); testProcNodeB.setProperties(properties);
testGroup.addProcessor(testProcNodeB); testGroup.addProcessor(testProcNodeB);
Collection<String> relationNames = new ArrayList<String>(); Collection<String> relationNames = new ArrayList<>();
relationNames.add("relation"); relationNames.add("relation");
Connection connection = fc.createConnection(UUID.randomUUID().toString(), Connection.class.getName(), testProcNodeA, testProcNodeB, relationNames); Connection connection = fc.createConnection(UUID.randomUUID().toString(), Connection.class.getName(), testProcNodeA, testProcNodeB, relationNames);
testGroup.addConnection(connection); testGroup.addConnection(connection);

View File

@ -55,12 +55,14 @@ import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.cluster.Heartbeater; import org.apache.nifi.controller.cluster.Heartbeater;
import org.apache.nifi.controller.reporting.StandardReportingInitializationContext; import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
import org.apache.nifi.controller.reporting.StandardReportingTaskNode; import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
import org.apache.nifi.controller.scheduling.processors.FailOnScheduledProcessor;
import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardControllerServiceNode; import org.apache.nifi.controller.service.StandardControllerServiceNode;
import org.apache.nifi.controller.service.StandardControllerServiceProvider; import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.service.mock.MockProcessGroup; import org.apache.nifi.controller.service.mock.MockProcessGroup;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.ExtensionManager;
@ -104,13 +106,16 @@ public class TestStandardProcessScheduler {
@Before @Before
public void setup() throws InitializationException { public void setup() throws InitializationException {
this.nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, null); final Map<String, String> overrideProperties = new HashMap<>();
overrideProperties.put(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "2 millis");
overrideProperties.put(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "10 millis");
this.nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, overrideProperties);
// load the system bundle // load the system bundle
systemBundle = SystemBundle.create(nifiProperties); systemBundle = SystemBundle.create(nifiProperties);
ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet()); ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet());
scheduler = new StandardProcessScheduler(Mockito.mock(ControllerServiceProvider.class), null, stateMgrProvider, nifiProperties); scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), Mockito.mock(ControllerServiceProvider.class), null, stateMgrProvider, nifiProperties);
scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, Mockito.mock(SchedulingAgent.class)); scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, Mockito.mock(SchedulingAgent.class));
reportingTask = new TestReportingTask(); reportingTask = new TestReportingTask();
@ -167,13 +172,13 @@ public class TestStandardProcessScheduler {
scheduler.schedule(taskNode); scheduler.schedule(taskNode);
// Let it try to run a few times. // Let it try to run a few times.
Thread.sleep(1000L); Thread.sleep(25L);
scheduler.unschedule(taskNode); scheduler.unschedule(taskNode);
final int attempts = reportingTask.onScheduleAttempts.get(); final int attempts = reportingTask.onScheduleAttempts.get();
// give it a sec to make sure that it's finished running. // give it a sec to make sure that it's finished running.
Thread.sleep(1500L); Thread.sleep(250L);
final int attemptsAfterStop = reportingTask.onScheduleAttempts.get() - attempts; final int attemptsAfterStop = reportingTask.onScheduleAttempts.get() - attempts;
// allow 1 extra run, due to timing issues that could call it as it's being stopped. // allow 1 extra run, due to timing issues that could call it as it's being stopped.
@ -207,7 +212,7 @@ public class TestStandardProcessScheduler {
scheduler.enableControllerService(service); scheduler.enableControllerService(service);
scheduler.startProcessor(procNode, true); scheduler.startProcessor(procNode, true);
Thread.sleep(1000L); Thread.sleep(25L);
scheduler.stopProcessor(procNode); scheduler.stopProcessor(procNode);
assertTrue(service.isActive()); assertTrue(service.isActive());
@ -215,7 +220,10 @@ public class TestStandardProcessScheduler {
scheduler.disableControllerService(service); scheduler.disableControllerService(service);
assertTrue(service.getState() == ControllerServiceState.DISABLING); assertTrue(service.getState() == ControllerServiceState.DISABLING);
assertFalse(service.isActive()); assertFalse(service.isActive());
Thread.sleep(2000);
while (service.getState() != ControllerServiceState.DISABLED) {
Thread.sleep(5L);
}
assertTrue(service.getState() == ControllerServiceState.DISABLED); assertTrue(service.getState() == ControllerServiceState.DISABLED);
} }
@ -472,37 +480,87 @@ public class TestStandardProcessScheduler {
assertEquals(0, ts.disableInvocationCount()); assertEquals(0, ts.disableInvocationCount());
} }
/** // Test that if processor throws Exception in @OnScheduled, it keeps getting scheduled
* Validates that the service that is currently in ENABLING state can be @Test(timeout = 10000)
* disabled and that its @OnDisabled operation will be invoked as soon as public void testProcessorThrowsExceptionOnScheduledRetry() throws InterruptedException {
* final FailOnScheduledProcessor proc = new FailOnScheduledProcessor();
* @OnEnable finishes. proc.setDesiredFailureCount(3);
*/
@Test
public void validateLongEnablingServiceCanStillBeDisabled() throws Exception {
final StandardProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
"1", systemBundle.getBundleDetails().getCoordinate(), null, false);
final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
ts.setLimit(3000);
scheduler.enableControllerService(serviceNode);
Thread.sleep(2000);
assertTrue(serviceNode.isActive());
assertEquals(1, ts.enableInvocationCount());
Thread.sleep(500); proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, nifiProperties));
scheduler.disableControllerService(serviceNode); final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
assertFalse(serviceNode.isActive()); final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null);
assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
assertEquals(0, ts.disableInvocationCount()); final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(),
// wait a bit. . . Enabling will finish and @OnDisabled will be invoked new StandardValidationContextFactory(controller, variableRegistry),
// automatically scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent);
Thread.sleep(4000);
assertEquals(ControllerServiceState.DISABLED, serviceNode.getState()); rootGroup.addProcessor(procNode);
assertEquals(1, ts.disableInvocationCount());
scheduler.startProcessor(procNode, true);
while (!proc.isSucceess()) {
Thread.sleep(5L);
} }
assertEquals(3, proc.getOnScheduledInvocationCount());
}
// Test that if processor times out in the @OnScheduled but responds to interrupt, it keeps getting scheduled
@Test(timeout = 1000000)
public void testProcessorTimeOutRespondsToInterrupt() throws InterruptedException {
final FailOnScheduledProcessor proc = new FailOnScheduledProcessor();
proc.setDesiredFailureCount(0);
proc.setOnScheduledSleepDuration(20, TimeUnit.MINUTES, true, 1);
proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, nifiProperties));
final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null);
final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(),
new StandardValidationContextFactory(controller, variableRegistry),
scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent);
rootGroup.addProcessor(procNode);
scheduler.startProcessor(procNode, true);
while (!proc.isSucceess()) {
Thread.sleep(5L);
}
// The first time that the processor's @OnScheduled method is called, it will sleep for 20 minutes. The scheduler should interrupt
// that thread and then try again. The second time, the Processor will not sleep because setOnScheduledSleepDuration was called
// above with iterations = 1
assertEquals(2, proc.getOnScheduledInvocationCount());
}
// Test that if processor times out in the @OnScheduled and does not respond to interrupt, it is not scheduled again
@Test(timeout = 10000)
public void testProcessorTimeOutNoResponseToInterrupt() throws InterruptedException {
final FailOnScheduledProcessor proc = new FailOnScheduledProcessor();
proc.setDesiredFailureCount(0);
proc.setOnScheduledSleepDuration(20, TimeUnit.MINUTES, false, 1);
proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, nifiProperties));
final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null);
final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(),
new StandardValidationContextFactory(controller, variableRegistry),
scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent);
rootGroup.addProcessor(procNode);
scheduler.startProcessor(procNode, true);
Thread.sleep(100L);
assertEquals(1, proc.getOnScheduledInvocationCount());
Thread.sleep(100L);
assertEquals(1, proc.getOnScheduledInvocationCount());
// Allow test to complete.
proc.setAllowSleepInterrupt(true);
}
public static class FailingService extends AbstractControllerService { public static class FailingService extends AbstractControllerService {
@OnEnabled @OnEnabled
@ -512,7 +570,6 @@ public class TestStandardProcessScheduler {
} }
public static class RandomShortDelayEnablingService extends AbstractControllerService { public static class RandomShortDelayEnablingService extends AbstractControllerService {
private final Random random = new Random(); private final Random random = new Random();
@OnEnabled @OnEnabled
@ -526,7 +583,6 @@ public class TestStandardProcessScheduler {
} }
public static class SimpleTestService extends AbstractControllerService { public static class SimpleTestService extends AbstractControllerService {
private final AtomicInteger enableCounter = new AtomicInteger(); private final AtomicInteger enableCounter = new AtomicInteger();
private final AtomicInteger disableCounter = new AtomicInteger(); private final AtomicInteger disableCounter = new AtomicInteger();
@ -549,38 +605,7 @@ public class TestStandardProcessScheduler {
} }
} }
public static class LongEnablingService extends AbstractControllerService {
private final AtomicInteger enableCounter = new AtomicInteger();
private final AtomicInteger disableCounter = new AtomicInteger();
private volatile long limit;
@OnEnabled
public void enable(final ConfigurationContext context) throws Exception {
this.enableCounter.incrementAndGet();
Thread.sleep(limit);
}
@OnDisabled
public void disable(final ConfigurationContext context) {
this.disableCounter.incrementAndGet();
}
public int enableInvocationCount() {
return this.enableCounter.get();
}
public int disableInvocationCount() {
return this.disableCounter.get();
}
public void setLimit(final long limit) {
this.limit = limit;
}
}
private StandardProcessScheduler createScheduler() { private StandardProcessScheduler createScheduler() {
return new StandardProcessScheduler(null, null, stateMgrProvider, nifiProperties); return new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, stateMgrProvider, nifiProperties);
} }
} }

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.scheduling.processors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.exception.ProcessException;
public class FailOnScheduledProcessor extends AbstractProcessor {
private volatile int invocationCount = 0;
private volatile int desiredFailureCount = 1;
private volatile long onScheduledSleepMillis = 0L;
private volatile int onScheduledSleepIterations = 0;
private volatile boolean allowSleepInterrupt = true;
private final AtomicBoolean succeeded = new AtomicBoolean();
public void setDesiredFailureCount(final int desiredFailureCount) {
this.desiredFailureCount = desiredFailureCount;
}
public void setOnScheduledSleepDuration(final long duration, final TimeUnit unit, final boolean allowInterrupt, final int iterations) {
this.onScheduledSleepMillis = unit.toMillis(duration);
this.onScheduledSleepIterations = iterations;
this.allowSleepInterrupt = allowInterrupt;
}
public void setAllowSleepInterrupt(final boolean allow) {
this.allowSleepInterrupt = allow;
}
@OnScheduled
public void onScheduled() throws InterruptedException {
invocationCount++;
if (invocationCount <= onScheduledSleepIterations && onScheduledSleepMillis > 0L) {
final long sleepFinish = System.currentTimeMillis() + onScheduledSleepMillis;
while (System.currentTimeMillis() < sleepFinish) {
try {
Thread.sleep(Math.max(0, sleepFinish - System.currentTimeMillis()));
} catch (final InterruptedException ie) {
if (allowSleepInterrupt) {
Thread.currentThread().interrupt();
throw ie;
} else {
continue;
}
}
}
}
if (invocationCount < desiredFailureCount) {
throw new ProcessException("Intentional failure for unit test");
} else {
succeeded.set(true);
}
}
public int getOnScheduledInvocationCount() {
return invocationCount;
}
public boolean isSucceess() {
return succeeded.get();
}
@Override
public void onTrigger(org.apache.nifi.processor.ProcessContext context, org.apache.nifi.processor.ProcessSession session) throws ProcessException {
}
}

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.service;
import static org.junit.Assert.assertTrue;
import java.beans.PropertyDescriptor;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.service.mock.MockProcessGroup;
import org.apache.nifi.controller.service.mock.ServiceA;
import org.apache.nifi.controller.service.mock.ServiceB;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.SystemBundle;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.NiFiProperties;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
public class StandardControllerServiceProviderIT {
private static Bundle systemBundle;
private static NiFiProperties niFiProperties;
private static VariableRegistry variableRegistry = VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY;
private static StateManagerProvider stateManagerProvider = new StateManagerProvider() {
@Override
public StateManager getStateManager(final String componentId) {
return Mockito.mock(StateManager.class);
}
@Override
public void shutdown() {
}
@Override
public void enableClusterProvider() {
}
@Override
public void disableClusterProvider() {
}
@Override
public void onComponentRemoved(final String componentId) {
}
};
@BeforeClass
public static void setNiFiProps() {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardControllerServiceProvider.class.getResource("/conf/nifi.properties").getFile());
niFiProperties = NiFiProperties.createBasicNiFiProperties(null, null);
// load the system bundle
systemBundle = SystemBundle.create(niFiProperties);
ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet());
}
/**
* We run the same test 1000 times and prior to bug fix (see NIFI-1143) it
* would fail on some iteration. For more details please see
* {@link PropertyDescriptor}.isDependentServiceEnableable() as well as
* https://issues.apache.org/jira/browse/NIFI-1143
*/
@Test(timeout = 120000)
public void testConcurrencyWithEnablingReferencingServicesGraph() throws InterruptedException {
final StandardProcessScheduler scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, stateManagerProvider, niFiProperties);
for (int i = 0; i < 5000; i++) {
testEnableReferencingServicesGraph(scheduler);
}
}
public void testEnableReferencingServicesGraph(final StandardProcessScheduler scheduler) {
final FlowController controller = Mockito.mock(FlowController.class);
final ProcessGroup procGroup = new MockProcessGroup(controller);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties);
// build a graph of controller services with dependencies as such:
//
// A -> B -> D
// C ---^----^
//
// In other words, A references B, which references D.
// AND
// C references B and D.
//
// So we have to verify that if D is enabled, when we enable its referencing services,
// we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so
// until B is first enabled so ensure that we enable B first.
final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1",
systemBundle.getBundleDetails().getCoordinate(), null, false);
final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2",
systemBundle.getBundleDetails().getCoordinate(), null, false);
final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3",
systemBundle.getBundleDetails().getCoordinate(), null, false);
final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4",
systemBundle.getBundleDetails().getCoordinate(), null, false);
procGroup.addControllerService(serviceNode1);
procGroup.addControllerService(serviceNode2);
procGroup.addControllerService(serviceNode3);
procGroup.addControllerService(serviceNode4);
setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2");
setProperty(serviceNode2, ServiceA.OTHER_SERVICE.getName(), "4");
setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "2");
setProperty(serviceNode3, ServiceA.OTHER_SERVICE_2.getName(), "4");
provider.enableControllerService(serviceNode4);
provider.enableReferencingServices(serviceNode4);
// Verify that the services are either ENABLING or ENABLED, and wait for all of them to become ENABLED.
// Note that we set a timeout of 10 seconds, in case a bug occurs and the services never become ENABLED.
final Set<ControllerServiceState> validStates = new HashSet<>();
validStates.add(ControllerServiceState.ENABLED);
validStates.add(ControllerServiceState.ENABLING);
while (serviceNode3.getState() != ControllerServiceState.ENABLED || serviceNode2.getState() != ControllerServiceState.ENABLED || serviceNode1.getState() != ControllerServiceState.ENABLED) {
assertTrue(validStates.contains(serviceNode3.getState()));
assertTrue(validStates.contains(serviceNode2.getState()));
assertTrue(validStates.contains(serviceNode1.getState()));
}
}
private void setProperty(ControllerServiceNode serviceNode, String propName, String propValue) {
Map<String, String> props = new LinkedHashMap<>();
props.put(propName, propValue);
serviceNode.setProperties(props);
}
}

View File

@ -21,14 +21,11 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.beans.PropertyDescriptor;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -48,6 +45,7 @@ import org.apache.nifi.controller.service.mock.MockProcessGroup;
import org.apache.nifi.controller.service.mock.ServiceA; import org.apache.nifi.controller.service.mock.ServiceA;
import org.apache.nifi.controller.service.mock.ServiceB; import org.apache.nifi.controller.service.mock.ServiceB;
import org.apache.nifi.controller.service.mock.ServiceC; import org.apache.nifi.controller.service.mock.ServiceC;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.StandardProcessGroup; import org.apache.nifi.groups.StandardProcessGroup;
import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.ExtensionManager;
@ -130,7 +128,7 @@ public class TestStandardControllerServiceProvider {
} }
private StandardProcessScheduler createScheduler() { private StandardProcessScheduler createScheduler() {
return new StandardProcessScheduler(null, null, stateManagerProvider, niFiProperties); return new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, stateManagerProvider, niFiProperties);
} }
private void setProperty(ControllerServiceNode serviceNode, String propName, String propValue) { private void setProperty(ControllerServiceNode serviceNode, String propName, String propValue) {
@ -205,75 +203,6 @@ public class TestStandardControllerServiceProvider {
} }
} }
/**
* We run the same test 1000 times and prior to bug fix (see NIFI-1143) it
* would fail on some iteration. For more details please see
* {@link PropertyDescriptor}.isDependentServiceEnableable() as well as
* https://issues.apache.org/jira/browse/NIFI-1143
*/
@Test(timeout = 120000)
public void testConcurrencyWithEnablingReferencingServicesGraph() throws InterruptedException {
final StandardProcessScheduler scheduler = createScheduler();
for (int i = 0; i < 5000; i++) {
testEnableReferencingServicesGraph(scheduler);
}
}
public void testEnableReferencingServicesGraph(final StandardProcessScheduler scheduler) {
final ProcessGroup procGroup = new MockProcessGroup(controller);
final FlowController controller = Mockito.mock(FlowController.class);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
final StandardControllerServiceProvider provider =
new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties);
// build a graph of controller services with dependencies as such:
//
// A -> B -> D
// C ---^----^
//
// In other words, A references B, which references D.
// AND
// C references B and D.
//
// So we have to verify that if D is enabled, when we enable its referencing services,
// we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so
// until B is first enabled so ensure that we enable B first.
final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1",
systemBundle.getBundleDetails().getCoordinate(), null, false);
final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2",
systemBundle.getBundleDetails().getCoordinate(), null, false);
final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3",
systemBundle.getBundleDetails().getCoordinate(), null, false);
final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4",
systemBundle.getBundleDetails().getCoordinate(), null, false);
procGroup.addControllerService(serviceNode1);
procGroup.addControllerService(serviceNode2);
procGroup.addControllerService(serviceNode3);
procGroup.addControllerService(serviceNode4);
setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2");
setProperty(serviceNode2, ServiceA.OTHER_SERVICE.getName(), "4");
setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "2");
setProperty(serviceNode3, ServiceA.OTHER_SERVICE_2.getName(), "4");
provider.enableControllerService(serviceNode4);
provider.enableReferencingServices(serviceNode4);
// Verify that the services are either ENABLING or ENABLED, and wait for all of them to become ENABLED.
// Note that we set a timeout of 10 seconds, in case a bug occurs and the services never become ENABLED.
final Set<ControllerServiceState> validStates = new HashSet<>();
validStates.add(ControllerServiceState.ENABLED);
validStates.add(ControllerServiceState.ENABLING);
while (serviceNode3.getState() != ControllerServiceState.ENABLED || serviceNode2.getState() != ControllerServiceState.ENABLED || serviceNode1.getState() != ControllerServiceState.ENABLED) {
assertTrue(validStates.contains(serviceNode3.getState()));
assertTrue(validStates.contains(serviceNode2.getState()));
assertTrue(validStates.contains(serviceNode1.getState()));
}
}
@Test @Test
public void testOrderingOfServices() { public void testOrderingOfServices() {
final ProcessGroup procGroup = new MockProcessGroup(controller); final ProcessGroup procGroup = new MockProcessGroup(controller);