diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 88dc11c3f6..e68000f9c8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -509,7 +509,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R throw new RuntimeException(e); } - processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider, this.nifiProperties); + processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, encryptor, stateManagerProvider, this.nifiProperties); eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository); @@ -1661,13 +1661,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * Updates the number of threads that can be simultaneously used for * executing processors. + * This method must be called while holding the write lock! * - * @param maxThreadCount This method must be called while holding the write - * lock! + * @param maxThreadCount max number of threads */ private void setMaxThreadCount(final int maxThreadCount, final FlowEngine engine, final AtomicInteger maxThreads) { if (maxThreadCount < 1) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Cannot set max number of threads to less than 2"); } maxThreads.getAndSet(maxThreadCount); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 187b62fa09..12f4b1e1ed 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -18,7 +18,6 @@ package org.apache.nifi.controller; import static java.util.Objects.requireNonNull; -import java.lang.reflect.InvocationTargetException; import java.net.URL; import java.util.ArrayList; import java.util.Collection; @@ -32,11 +31,9 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -131,12 +128,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private final ProcessScheduler processScheduler; private long runNanos = 0L; private volatile long yieldNanos; - private final NiFiProperties nifiProperties; private volatile ScheduledState desiredState; private SchedulingStrategy schedulingStrategy; // guarded by read/write lock // ??????? NOT any more private ExecutionNode executionNode; + private final long onScheduleTimeoutMillis; public StandardProcessorNode(final LoggableComponent processor, final String uuid, final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, @@ -176,7 +173,9 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable this.processGroup = new AtomicReference<>(); processScheduler = scheduler; penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD); - this.nifiProperties = nifiProperties; + + final String timeoutString = nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT); + onScheduleTimeoutMillis = timeoutString == null ? 60000 : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS); schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN; executionNode = ExecutionNode.ALL; @@ -300,6 +299,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override + @SuppressWarnings("deprecation") public boolean isIsolated() { return schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY || executionNode == ExecutionNode.PRIMARY; } @@ -465,6 +465,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } @Override + @SuppressWarnings("deprecation") public synchronized void setScheduldingPeriod(final String schedulingPeriod) { if (isRunning()) { throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); @@ -1312,7 +1313,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } if (starting) { // will ensure that the Processor represented by this node can only be started once - taskScheduler.execute(() -> initiateStart(taskScheduler, administrativeYieldMillis, processContext, schedulingAgentCallback)); + initiateStart(taskScheduler, administrativeYieldMillis, processContext, schedulingAgentCallback); } else { final String procName = processorRef.get().toString(); LOG.warn("Cannot start {} because it is not currently stopped. Current state is {}", procName, currentState); @@ -1326,40 +1327,83 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable final Processor processor = getProcessor(); final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor); - try { - invokeTaskAsCancelableFuture(schedulingAgentCallback, new Callable() { - @Override - public Void call() throws Exception { - try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { - ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext); - return null; + final long completionTimestamp = System.currentTimeMillis() + onScheduleTimeoutMillis; + + // Create a task to invoke the @OnScheduled annotation of the processor + final Callable startupTask = () -> { + LOG.debug("Invoking @OnScheduled methods of {}", processor); + + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext); + + if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) { + LOG.debug("Successfully completed the @OnScheduled methods of {}; will now start triggering processor to run", processor); + schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle + } else { + LOG.debug("Successfully invoked @OnScheduled methods of {} but scheduled state is no longer STARTING so will stop processor now", processor); + + // can only happen if stopProcessor was called before service was transitioned to RUNNING state + try { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); + } finally { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); + } + + scheduledState.set(ScheduledState.STOPPED); + } + } finally { + schedulingAgentCallback.onTaskComplete(); + } + } catch (final Exception e) { + procLog.error("Failed to properly initialize Processor. If still scheduled to run, NiFi will attempt to " + + "initialize and run the Processor again after the 'Administrative Yield Duration' has elapsed. Failure is due to " + e, e); + + // If processor's task completed Exceptionally, then we want to retry initiating the start (if Processor is still scheduled to run). + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { + try { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); + } finally { + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); } } - }); - if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) { - schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle - } else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state - try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); + // make sure we only continue retry loop if STOP action wasn't initiated + if (scheduledState.get() != ScheduledState.STOPPING) { + // re-initiate the entire process + final Runnable initiateStartTask = () -> initiateStart(taskScheduler, administrativeYieldMillis, processContext, schedulingAgentCallback); + taskScheduler.schedule(initiateStartTask, administrativeYieldMillis, TimeUnit.MILLISECONDS); + } else { + scheduledState.set(ScheduledState.STOPPED); } - scheduledState.set(ScheduledState.STOPPED); } - } catch (final Exception e) { - final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; - procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {} seconds", - new Object[]{StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis / 1000L}, cause); - LOG.error("Failed to invoke @OnScheduled method due to {}", cause.toString(), cause); - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext); - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext); + return null; + }; - if (scheduledState.get() != ScheduledState.STOPPING) { // make sure we only continue retry loop if STOP action wasn't initiated - taskScheduler.schedule(() -> initiateStart(taskScheduler, administrativeYieldMillis, processContext, schedulingAgentCallback), administrativeYieldMillis, TimeUnit.MILLISECONDS); - } else { - scheduledState.set(ScheduledState.STOPPED); + // Trigger the task in a background thread. + final Future taskFuture = schedulingAgentCallback.scheduleTask(startupTask); + + // Trigger a task periodically to check if @OnScheduled task completed. Once it has, + // this task will call SchedulingAgentCallback#onTaskComplete. + // However, if the task times out, we need to be able to cancel the monitoring. So, in order + // to do this, we use #scheduleWithFixedDelay and then make that Future available to the task + // itself by placing it into an AtomicReference. + final AtomicReference> futureRef = new AtomicReference<>(); + final Runnable monitoringTask = new Runnable() { + @Override + public void run() { + Future monitoringFuture = futureRef.get(); + if (monitoringFuture == null) { // Future is not yet available. Just return and wait for the next invocation. + return; + } + + monitorAsyncTask(taskFuture, monitoringFuture, completionTimestamp); } - } + }; + + final Future future = taskScheduler.scheduleWithFixedDelay(monitoringTask, 1, 10, TimeUnit.MILLISECONDS); + futureRef.set(future); } /** @@ -1451,59 +1495,26 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable return future; } - /** - * Will invoke lifecycle operation (OnScheduled or OnUnscheduled) - * asynchronously to ensure that it could be interrupted if stop action was - * initiated on the processor that may be infinitely blocking in such - * operation. While this approach paves the way for further enhancements - * related to managing processor'slife-cycle operation at the moment the - * interrupt will not happen automatically. This is primarily to preserve - * the existing behavior of the NiFi where stop operation can only be - * invoked once the processor is started. Unfortunately that could mean that - * the processor may be blocking indefinitely in lifecycle operation - * (OnScheduled or OnUnscheduled). To deal with that a new NiFi property has - * been introduced nifi.processor.scheduling.timeout which allows one - * to set the time (in milliseconds) of how long to wait before canceling - * such lifecycle operation (OnScheduled or OnUnscheduled) allowing - * processor's stop sequence to proceed. The default value for this property - * is {@link Long#MAX_VALUE}. - *

- * NOTE: Canceling the task does not guarantee that the task will actually - * completes (successfully or otherwise), since cancellation of the task - * will issue a simple Thread.interrupt(). However code inside of lifecycle - * operation (OnScheduled or OnUnscheduled) is written purely and will - * ignore thread interrupts you may end up with runaway thread which may - * eventually require NiFi reboot. In any event, the above explanation will - * be logged (WARN) informing a user so further actions could be taken. - *

- */ - private void invokeTaskAsCancelableFuture(final SchedulingAgentCallback callback, final Callable task) { - final Processor processor = processorRef.get().getProcessor(); - final String timeoutString = nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT); - final long onScheduleTimeout = timeoutString == null ? 60000 - : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS); - final Future taskFuture = callback.scheduleTask(task); - try { - taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS); - } catch (final InterruptedException e) { - LOG.warn("Thread was interrupted while waiting for processor '" + processor.getClass().getSimpleName() - + "' lifecycle OnScheduled operation to finish."); - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while executing one of processor's OnScheduled tasks.", e); - } catch (final TimeoutException e) { + + private void monitorAsyncTask(final Future taskFuture, final Future monitoringFuture, final long completionTimestamp) { + if (taskFuture.isDone()) { + monitoringFuture.cancel(false); // stop scheduling this task + } else if (System.currentTimeMillis() > completionTimestamp) { + // Task timed out. Request an interrupt of the processor task taskFuture.cancel(true); - LOG.warn("Timed out while waiting for OnScheduled of '" - + processor.getClass().getSimpleName() - + "' processor to finish. An attempt is made to cancel the task via Thread.interrupt(). However it does not " - + "guarantee that the task will be canceled since the code inside current OnScheduled operation may " + + // Stop monitoring the processor. We have interrupted the thread so that's all we can do. If the processor responds to the interrupt, then + // it will be re-scheduled. If it does not, then it will either keep the thread indefinitely or eventually finish, at which point + // the Processor will begin running. + monitoringFuture.cancel(false); + + final Processor processor = processorRef.get().getProcessor(); + LOG.warn("Timed out while waiting for OnScheduled of " + + processor + " to finish. An attempt is made to cancel the task via Thread.interrupt(). However it does not " + + "guarantee that the task will be canceled since the code inside current OnScheduled operation may " + "have been written to ignore interrupts which may result in a runaway thread. This could lead to more issues, " - + "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '" - + processor + "' that needs to be documented, reported and eventually fixed."); - throw new RuntimeException("Timed out while executing one of processor's OnScheduled task.", e); - } catch (final ExecutionException e){ - throw new RuntimeException("Failed while executing one of processor's OnScheduled task.", e); - } finally { - callback.onTaskComplete(); + + "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '" + + processor + "' that needs to be documented, reported and eventually fixed."); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/exception/ComponentStartTimeoutException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/exception/ComponentStartTimeoutException.java new file mode 100644 index 0000000000..cca8698c3f --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/exception/ComponentStartTimeoutException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.exception; + +public class ComponentStartTimeoutException extends Exception { + public ComponentStartTimeoutException(String message) { + super(message); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index d08d701816..1155bfe11b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -77,19 +77,21 @@ public final class StandardProcessScheduler implements ProcessScheduler { private final ConcurrentMap scheduleStates = new ConcurrentHashMap<>(); private final ScheduledExecutorService frameworkTaskExecutor; private final ConcurrentMap strategyAgentMap = new ConcurrentHashMap<>(); - // thread pool for starting/stopping components - private final ScheduledExecutorService componentLifeCycleThreadPool = new FlowEngine(8, "StandardProcessScheduler", true); - private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(8, "StandardProcessScheduler", true); + // thread pool for starting/stopping components + private final ScheduledExecutorService componentLifeCycleThreadPool; + private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(2, "Monitor Processore Lifecycle", true); private final StringEncryptor encryptor; public StandardProcessScheduler( + final FlowEngine componentLifecycleThreadPool, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManagerProvider stateManagerProvider, final NiFiProperties nifiProperties ) { + this.componentLifeCycleThreadPool = componentLifecycleThreadPool; this.controllerServiceProvider = controllerServiceProvider; this.encryptor = encryptor; this.stateManagerProvider = stateManagerProvider; @@ -164,7 +166,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { frameworkTaskExecutor.shutdown(); componentLifeCycleThreadPool.shutdown(); - componentMonitoringThreadPool.shutdown(); } @Override @@ -313,7 +314,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public Future scheduleTask(Callable task) { scheduleState.incrementActiveThreadCount(); - return componentMonitoringThreadPool.submit(task); + return componentLifeCycleThreadPool.submit(task); } @Override @@ -323,7 +324,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { }; LOG.info("Starting {}", procNode); - procNode.start(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, processContext, callback, failIfStopping); + procNode.start(this.componentMonitoringThreadPool, this.administrativeYieldMillis, processContext, callback, failIfStopping); return future; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java index 7b3996347e..9bff6f6fd4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java @@ -100,7 +100,7 @@ public class TestStandardProcessorNode { final LoggableComponent loggableComponent = new LoggableComponent<>(processor, coordinate, null); final StandardProcessorNode procNode = new StandardProcessorNode(loggableComponent, uuid, createValidationContextFactory(), null, null, NiFiProperties.createBasicNiFiProperties(null, null), new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent); - final ScheduledExecutorService taskScheduler = new FlowEngine(2, "TestClasspathResources", true); + final ScheduledExecutorService taskScheduler = new FlowEngine(1, "TestClasspathResources", true); final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null); final SchedulingAgentCallback schedulingAgentCallback = new SchedulingAgentCallback() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/LongEnablingService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/LongEnablingService.java new file mode 100644 index 0000000000..09b824a1d8 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/LongEnablingService.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.scheduling; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; + +public class LongEnablingService extends AbstractControllerService { + + private final AtomicInteger enableCounter = new AtomicInteger(); + private final AtomicInteger disableCounter = new AtomicInteger(); + + private volatile long limit; + + @OnEnabled + public void enable(final ConfigurationContext context) throws Exception { + this.enableCounter.incrementAndGet(); + Thread.sleep(limit); + } + + @OnDisabled + public void disable(final ConfigurationContext context) { + this.disableCounter.incrementAndGet(); + } + + public int enableInvocationCount() { + return this.enableCounter.get(); + } + + public int disableInvocationCount() { + return this.disableCounter.get(); + } + + public void setLimit(final long limit) { + this.limit = limit; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java new file mode 100644 index 0000000000..2d7b22f8da --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/StandardProcessSchedulerIT.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.scheduling; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; + +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.components.state.StateManagerProvider; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.controller.service.StandardControllerServiceProvider; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.SystemBundle; +import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.NiFiProperties; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class StandardProcessSchedulerIT { + private final StateManagerProvider stateMgrProvider = Mockito.mock(StateManagerProvider.class); + private VariableRegistry variableRegistry = VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY; + private FlowController controller; + private NiFiProperties nifiProperties; + private Bundle systemBundle; + private volatile String propsFile = TestStandardProcessScheduler.class.getResource("/standardprocessschedulertest.nifi.properties").getFile(); + + @Before + public void setup() throws InitializationException { + this.nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, null); + + // load the system bundle + systemBundle = SystemBundle.create(nifiProperties); + ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet()); + + controller = Mockito.mock(FlowController.class); + } + + /** + * Validates that the service that is currently in ENABLING state can be + * disabled and that its @OnDisabled operation will be invoked as soon as + * + * @OnEnable finishes. + */ + @Test + public void validateLongEnablingServiceCanStillBeDisabled() throws Exception { + final StandardProcessScheduler scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, stateMgrProvider, nifiProperties); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); + final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(), + "1", systemBundle.getBundleDetails().getCoordinate(), null, false); + final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation(); + ts.setLimit(3000); + scheduler.enableControllerService(serviceNode); + Thread.sleep(2000); + assertTrue(serviceNode.isActive()); + assertEquals(1, ts.enableInvocationCount()); + + Thread.sleep(500); + scheduler.disableControllerService(serviceNode); + assertFalse(serviceNode.isActive()); + assertEquals(ControllerServiceState.DISABLING, serviceNode.getState()); + assertEquals(0, ts.disableInvocationCount()); + // wait a bit. . . Enabling will finish and @OnDisabled will be invoked + // automatically + Thread.sleep(4000); + assertEquals(ControllerServiceState.DISABLED, serviceNode.getState()); + assertEquals(1, ts.disableInvocationCount()); + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index 1b54c6470b..d459f5ca9b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -339,7 +339,7 @@ public class TestProcessorLifecycle { TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); // sets the scenario for the processor to run - int delay = 2000; + int delay = 200; this.longRunningOnSchedule(testProcessor, delay); ProcessScheduler ps = fc.getProcessScheduler(); @@ -348,9 +348,10 @@ public class TestProcessorLifecycle { ps.stopProcessor(testProcNode); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 5000L); - assertCondition(() -> testProcessor.operationNames.size() == 2, 8000L); + assertCondition(() -> testProcessor.operationNames.size() == 3, 8000L); assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); + assertEquals("@OnStopped", testProcessor.operationNames.get(2)); } /** @@ -442,7 +443,7 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception { - final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec"); + final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "1 sec"); fc = fcsb.getFlowController(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); @@ -462,7 +463,7 @@ public class TestProcessorLifecycle { } /** - * Validates that processor can be stopped if onTrigger() keeps trowing + * Validates that processor can be stopped if onTrigger() keeps throwing * exceptions. */ @Test @@ -593,7 +594,7 @@ public class TestProcessorLifecycle { testProcNodeB.setProperties(properties); testGroup.addProcessor(testProcNodeB); - Collection relationNames = new ArrayList(); + Collection relationNames = new ArrayList<>(); relationNames.add("relation"); Connection connection = fc.createConnection(UUID.randomUUID().toString(), Connection.class.getName(), testProcNodeA, testProcNodeB, relationNames); testGroup.addConnection(connection); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index c0b36c91d1..2497ac7cab 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -55,12 +55,14 @@ import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.cluster.Heartbeater; import org.apache.nifi.controller.reporting.StandardReportingInitializationContext; import org.apache.nifi.controller.reporting.StandardReportingTaskNode; +import org.apache.nifi.controller.scheduling.processors.FailOnScheduledProcessor; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.controller.service.StandardControllerServiceNode; import org.apache.nifi.controller.service.StandardControllerServiceProvider; import org.apache.nifi.controller.service.mock.MockProcessGroup; +import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.nar.ExtensionManager; @@ -104,13 +106,16 @@ public class TestStandardProcessScheduler { @Before public void setup() throws InitializationException { - this.nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, null); + final Map overrideProperties = new HashMap<>(); + overrideProperties.put(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "2 millis"); + overrideProperties.put(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "10 millis"); + this.nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, overrideProperties); // load the system bundle systemBundle = SystemBundle.create(nifiProperties); ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet()); - scheduler = new StandardProcessScheduler(Mockito.mock(ControllerServiceProvider.class), null, stateMgrProvider, nifiProperties); + scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), Mockito.mock(ControllerServiceProvider.class), null, stateMgrProvider, nifiProperties); scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, Mockito.mock(SchedulingAgent.class)); reportingTask = new TestReportingTask(); @@ -167,13 +172,13 @@ public class TestStandardProcessScheduler { scheduler.schedule(taskNode); // Let it try to run a few times. - Thread.sleep(1000L); + Thread.sleep(25L); scheduler.unschedule(taskNode); final int attempts = reportingTask.onScheduleAttempts.get(); // give it a sec to make sure that it's finished running. - Thread.sleep(1500L); + Thread.sleep(250L); final int attemptsAfterStop = reportingTask.onScheduleAttempts.get() - attempts; // allow 1 extra run, due to timing issues that could call it as it's being stopped. @@ -207,7 +212,7 @@ public class TestStandardProcessScheduler { scheduler.enableControllerService(service); scheduler.startProcessor(procNode, true); - Thread.sleep(1000L); + Thread.sleep(25L); scheduler.stopProcessor(procNode); assertTrue(service.isActive()); @@ -215,7 +220,10 @@ public class TestStandardProcessScheduler { scheduler.disableControllerService(service); assertTrue(service.getState() == ControllerServiceState.DISABLING); assertFalse(service.isActive()); - Thread.sleep(2000); + + while (service.getState() != ControllerServiceState.DISABLED) { + Thread.sleep(5L); + } assertTrue(service.getState() == ControllerServiceState.DISABLED); } @@ -472,37 +480,87 @@ public class TestStandardProcessScheduler { assertEquals(0, ts.disableInvocationCount()); } - /** - * Validates that the service that is currently in ENABLING state can be - * disabled and that its @OnDisabled operation will be invoked as soon as - * - * @OnEnable finishes. - */ - @Test - public void validateLongEnablingServiceCanStillBeDisabled() throws Exception { - final StandardProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); - final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(), - "1", systemBundle.getBundleDetails().getCoordinate(), null, false); - final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation(); - ts.setLimit(3000); - scheduler.enableControllerService(serviceNode); - Thread.sleep(2000); - assertTrue(serviceNode.isActive()); - assertEquals(1, ts.enableInvocationCount()); + // Test that if processor throws Exception in @OnScheduled, it keeps getting scheduled + @Test(timeout = 10000) + public void testProcessorThrowsExceptionOnScheduledRetry() throws InterruptedException { + final FailOnScheduledProcessor proc = new FailOnScheduledProcessor(); + proc.setDesiredFailureCount(3); - Thread.sleep(500); - scheduler.disableControllerService(serviceNode); - assertFalse(serviceNode.isActive()); - assertEquals(ControllerServiceState.DISABLING, serviceNode.getState()); - assertEquals(0, ts.disableInvocationCount()); - // wait a bit. . . Enabling will finish and @OnDisabled will be invoked - // automatically - Thread.sleep(4000); - assertEquals(ControllerServiceState.DISABLED, serviceNode.getState()); - assertEquals(1, ts.disableInvocationCount()); + proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, nifiProperties)); + final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class); + final LoggableComponent loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null); + + final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(), + new StandardValidationContextFactory(controller, variableRegistry), + scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent); + + rootGroup.addProcessor(procNode); + + scheduler.startProcessor(procNode, true); + while (!proc.isSucceess()) { + Thread.sleep(5L); + } + + assertEquals(3, proc.getOnScheduledInvocationCount()); } + // Test that if processor times out in the @OnScheduled but responds to interrupt, it keeps getting scheduled + @Test(timeout = 1000000) + public void testProcessorTimeOutRespondsToInterrupt() throws InterruptedException { + final FailOnScheduledProcessor proc = new FailOnScheduledProcessor(); + proc.setDesiredFailureCount(0); + proc.setOnScheduledSleepDuration(20, TimeUnit.MINUTES, true, 1); + + proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, nifiProperties)); + final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class); + final LoggableComponent loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null); + + final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(), + new StandardValidationContextFactory(controller, variableRegistry), + scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent); + + rootGroup.addProcessor(procNode); + + scheduler.startProcessor(procNode, true); + while (!proc.isSucceess()) { + Thread.sleep(5L); + } + + // The first time that the processor's @OnScheduled method is called, it will sleep for 20 minutes. The scheduler should interrupt + // that thread and then try again. The second time, the Processor will not sleep because setOnScheduledSleepDuration was called + // above with iterations = 1 + assertEquals(2, proc.getOnScheduledInvocationCount()); + } + + // Test that if processor times out in the @OnScheduled and does not respond to interrupt, it is not scheduled again + @Test(timeout = 10000) + public void testProcessorTimeOutNoResponseToInterrupt() throws InterruptedException { + final FailOnScheduledProcessor proc = new FailOnScheduledProcessor(); + proc.setDesiredFailureCount(0); + proc.setOnScheduledSleepDuration(20, TimeUnit.MINUTES, false, 1); + + proc.initialize(new StandardProcessorInitializationContext(UUID.randomUUID().toString(), null, null, null, nifiProperties)); + final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class); + final LoggableComponent loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null); + + final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, UUID.randomUUID().toString(), + new StandardValidationContextFactory(controller, variableRegistry), + scheduler, controller, nifiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent); + + rootGroup.addProcessor(procNode); + + scheduler.startProcessor(procNode, true); + + Thread.sleep(100L); + assertEquals(1, proc.getOnScheduledInvocationCount()); + Thread.sleep(100L); + assertEquals(1, proc.getOnScheduledInvocationCount()); + + // Allow test to complete. + proc.setAllowSleepInterrupt(true); + } + + public static class FailingService extends AbstractControllerService { @OnEnabled @@ -512,7 +570,6 @@ public class TestStandardProcessScheduler { } public static class RandomShortDelayEnablingService extends AbstractControllerService { - private final Random random = new Random(); @OnEnabled @@ -526,7 +583,6 @@ public class TestStandardProcessScheduler { } public static class SimpleTestService extends AbstractControllerService { - private final AtomicInteger enableCounter = new AtomicInteger(); private final AtomicInteger disableCounter = new AtomicInteger(); @@ -549,38 +605,7 @@ public class TestStandardProcessScheduler { } } - public static class LongEnablingService extends AbstractControllerService { - - private final AtomicInteger enableCounter = new AtomicInteger(); - private final AtomicInteger disableCounter = new AtomicInteger(); - - private volatile long limit; - - @OnEnabled - public void enable(final ConfigurationContext context) throws Exception { - this.enableCounter.incrementAndGet(); - Thread.sleep(limit); - } - - @OnDisabled - public void disable(final ConfigurationContext context) { - this.disableCounter.incrementAndGet(); - } - - public int enableInvocationCount() { - return this.enableCounter.get(); - } - - public int disableInvocationCount() { - return this.disableCounter.get(); - } - - public void setLimit(final long limit) { - this.limit = limit; - } - } - private StandardProcessScheduler createScheduler() { - return new StandardProcessScheduler(null, null, stateMgrProvider, nifiProperties); + return new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, stateMgrProvider, nifiProperties); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/processors/FailOnScheduledProcessor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/processors/FailOnScheduledProcessor.java new file mode 100644 index 0000000000..acfe39060e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/processors/FailOnScheduledProcessor.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.scheduling.processors; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.exception.ProcessException; + +public class FailOnScheduledProcessor extends AbstractProcessor { + + private volatile int invocationCount = 0; + private volatile int desiredFailureCount = 1; + private volatile long onScheduledSleepMillis = 0L; + private volatile int onScheduledSleepIterations = 0; + private volatile boolean allowSleepInterrupt = true; + private final AtomicBoolean succeeded = new AtomicBoolean(); + + public void setDesiredFailureCount(final int desiredFailureCount) { + this.desiredFailureCount = desiredFailureCount; + } + + public void setOnScheduledSleepDuration(final long duration, final TimeUnit unit, final boolean allowInterrupt, final int iterations) { + this.onScheduledSleepMillis = unit.toMillis(duration); + this.onScheduledSleepIterations = iterations; + this.allowSleepInterrupt = allowInterrupt; + } + + public void setAllowSleepInterrupt(final boolean allow) { + this.allowSleepInterrupt = allow; + } + + @OnScheduled + public void onScheduled() throws InterruptedException { + invocationCount++; + + if (invocationCount <= onScheduledSleepIterations && onScheduledSleepMillis > 0L) { + final long sleepFinish = System.currentTimeMillis() + onScheduledSleepMillis; + + while (System.currentTimeMillis() < sleepFinish) { + try { + Thread.sleep(Math.max(0, sleepFinish - System.currentTimeMillis())); + } catch (final InterruptedException ie) { + if (allowSleepInterrupt) { + Thread.currentThread().interrupt(); + throw ie; + } else { + continue; + } + } + } + } + + if (invocationCount < desiredFailureCount) { + throw new ProcessException("Intentional failure for unit test"); + } else { + succeeded.set(true); + } + } + + public int getOnScheduledInvocationCount() { + return invocationCount; + } + + public boolean isSucceess() { + return succeeded.get(); + } + + @Override + public void onTrigger(org.apache.nifi.processor.ProcessContext context, org.apache.nifi.processor.ProcessSession session) throws ProcessException { + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderIT.java new file mode 100644 index 0000000000..015ae67fb8 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderIT.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.service; + +import static org.junit.Assert.assertTrue; + +import java.beans.PropertyDescriptor; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateManagerProvider; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.scheduling.StandardProcessScheduler; +import org.apache.nifi.controller.service.mock.MockProcessGroup; +import org.apache.nifi.controller.service.mock.ServiceA; +import org.apache.nifi.controller.service.mock.ServiceB; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.SystemBundle; +import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.util.NiFiProperties; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +public class StandardControllerServiceProviderIT { + private static Bundle systemBundle; + private static NiFiProperties niFiProperties; + private static VariableRegistry variableRegistry = VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY; + + private static StateManagerProvider stateManagerProvider = new StateManagerProvider() { + @Override + public StateManager getStateManager(final String componentId) { + return Mockito.mock(StateManager.class); + } + + @Override + public void shutdown() { + } + + @Override + public void enableClusterProvider() { + } + + @Override + public void disableClusterProvider() { + } + + @Override + public void onComponentRemoved(final String componentId) { + } + }; + + @BeforeClass + public static void setNiFiProps() { + System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardControllerServiceProvider.class.getResource("/conf/nifi.properties").getFile()); + niFiProperties = NiFiProperties.createBasicNiFiProperties(null, null); + + // load the system bundle + systemBundle = SystemBundle.create(niFiProperties); + ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet()); + } + + /** + * We run the same test 1000 times and prior to bug fix (see NIFI-1143) it + * would fail on some iteration. For more details please see + * {@link PropertyDescriptor}.isDependentServiceEnableable() as well as + * https://issues.apache.org/jira/browse/NIFI-1143 + */ + @Test(timeout = 120000) + public void testConcurrencyWithEnablingReferencingServicesGraph() throws InterruptedException { + final StandardProcessScheduler scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, stateManagerProvider, niFiProperties); + for (int i = 0; i < 5000; i++) { + testEnableReferencingServicesGraph(scheduler); + } + } + + public void testEnableReferencingServicesGraph(final StandardProcessScheduler scheduler) { + final FlowController controller = Mockito.mock(FlowController.class); + final ProcessGroup procGroup = new MockProcessGroup(controller); + Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); + + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties); + + // build a graph of controller services with dependencies as such: + // + // A -> B -> D + // C ---^----^ + // + // In other words, A references B, which references D. + // AND + // C references B and D. + // + // So we have to verify that if D is enabled, when we enable its referencing services, + // we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so + // until B is first enabled so ensure that we enable B first. + final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", + systemBundle.getBundleDetails().getCoordinate(), null, false); + final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", + systemBundle.getBundleDetails().getCoordinate(), null, false); + final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", + systemBundle.getBundleDetails().getCoordinate(), null, false); + final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", + systemBundle.getBundleDetails().getCoordinate(), null, false); + + procGroup.addControllerService(serviceNode1); + procGroup.addControllerService(serviceNode2); + procGroup.addControllerService(serviceNode3); + procGroup.addControllerService(serviceNode4); + + setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2"); + setProperty(serviceNode2, ServiceA.OTHER_SERVICE.getName(), "4"); + setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "2"); + setProperty(serviceNode3, ServiceA.OTHER_SERVICE_2.getName(), "4"); + + provider.enableControllerService(serviceNode4); + provider.enableReferencingServices(serviceNode4); + + // Verify that the services are either ENABLING or ENABLED, and wait for all of them to become ENABLED. + // Note that we set a timeout of 10 seconds, in case a bug occurs and the services never become ENABLED. + final Set validStates = new HashSet<>(); + validStates.add(ControllerServiceState.ENABLED); + validStates.add(ControllerServiceState.ENABLING); + + while (serviceNode3.getState() != ControllerServiceState.ENABLED || serviceNode2.getState() != ControllerServiceState.ENABLED || serviceNode1.getState() != ControllerServiceState.ENABLED) { + assertTrue(validStates.contains(serviceNode3.getState())); + assertTrue(validStates.contains(serviceNode2.getState())); + assertTrue(validStates.contains(serviceNode1.getState())); + } + } + + private void setProperty(ControllerServiceNode serviceNode, String propName, String propValue) { + Map props = new LinkedHashMap<>(); + props.put(propName, propValue); + serviceNode.setProperties(props); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index ed335e978f..fac04a6bd1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -21,14 +21,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.beans.PropertyDescriptor; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -48,6 +45,7 @@ import org.apache.nifi.controller.service.mock.MockProcessGroup; import org.apache.nifi.controller.service.mock.ServiceA; import org.apache.nifi.controller.service.mock.ServiceB; import org.apache.nifi.controller.service.mock.ServiceC; +import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.StandardProcessGroup; import org.apache.nifi.nar.ExtensionManager; @@ -130,7 +128,7 @@ public class TestStandardControllerServiceProvider { } private StandardProcessScheduler createScheduler() { - return new StandardProcessScheduler(null, null, stateManagerProvider, niFiProperties); + return new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, stateManagerProvider, niFiProperties); } private void setProperty(ControllerServiceNode serviceNode, String propName, String propValue) { @@ -205,75 +203,6 @@ public class TestStandardControllerServiceProvider { } } - /** - * We run the same test 1000 times and prior to bug fix (see NIFI-1143) it - * would fail on some iteration. For more details please see - * {@link PropertyDescriptor}.isDependentServiceEnableable() as well as - * https://issues.apache.org/jira/browse/NIFI-1143 - */ - @Test(timeout = 120000) - public void testConcurrencyWithEnablingReferencingServicesGraph() throws InterruptedException { - final StandardProcessScheduler scheduler = createScheduler(); - for (int i = 0; i < 5000; i++) { - testEnableReferencingServicesGraph(scheduler); - } - } - - public void testEnableReferencingServicesGraph(final StandardProcessScheduler scheduler) { - final ProcessGroup procGroup = new MockProcessGroup(controller); - final FlowController controller = Mockito.mock(FlowController.class); - Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); - - final StandardControllerServiceProvider provider = - new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties); - - // build a graph of controller services with dependencies as such: - // - // A -> B -> D - // C ---^----^ - // - // In other words, A references B, which references D. - // AND - // C references B and D. - // - // So we have to verify that if D is enabled, when we enable its referencing services, - // we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so - // until B is first enabled so ensure that we enable B first. - final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", - systemBundle.getBundleDetails().getCoordinate(), null, false); - final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", - systemBundle.getBundleDetails().getCoordinate(), null, false); - final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", - systemBundle.getBundleDetails().getCoordinate(), null, false); - final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", - systemBundle.getBundleDetails().getCoordinate(), null, false); - - procGroup.addControllerService(serviceNode1); - procGroup.addControllerService(serviceNode2); - procGroup.addControllerService(serviceNode3); - procGroup.addControllerService(serviceNode4); - - setProperty(serviceNode1, ServiceA.OTHER_SERVICE.getName(), "2"); - setProperty(serviceNode2, ServiceA.OTHER_SERVICE.getName(), "4"); - setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "2"); - setProperty(serviceNode3, ServiceA.OTHER_SERVICE_2.getName(), "4"); - - provider.enableControllerService(serviceNode4); - provider.enableReferencingServices(serviceNode4); - - // Verify that the services are either ENABLING or ENABLED, and wait for all of them to become ENABLED. - // Note that we set a timeout of 10 seconds, in case a bug occurs and the services never become ENABLED. - final Set validStates = new HashSet<>(); - validStates.add(ControllerServiceState.ENABLED); - validStates.add(ControllerServiceState.ENABLING); - - while (serviceNode3.getState() != ControllerServiceState.ENABLED || serviceNode2.getState() != ControllerServiceState.ENABLED || serviceNode1.getState() != ControllerServiceState.ENABLED) { - assertTrue(validStates.contains(serviceNode3.getState())); - assertTrue(validStates.contains(serviceNode2.getState())); - assertTrue(validStates.contains(serviceNode1.getState())); - } - } - @Test public void testOrderingOfServices() { final ProcessGroup procGroup = new MockProcessGroup(controller);