From 8da7327188ebdb3cbadda257429e4967be07bf77 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 23 Jun 2015 16:23:01 -0400 Subject: [PATCH] NIFI-720: Ensure that if Reporting Task stopped while @OnScheduled method is failing that it does not start running when configuration is fixed --- .../scheduling/StandardProcessScheduler.java | 76 ++++++++----- .../TestStandardProcessScheduler.java | 107 ++++++++++++++++++ .../src/test/resources/nifi.properties | 2 +- 3 files changed, 155 insertions(+), 30 deletions(-) create mode 100644 nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index cf644ed734..d976bd02fe 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -174,31 +174,45 @@ public final class StandardProcessScheduler implements ProcessScheduler { @SuppressWarnings("deprecation") @Override public void run() { + final long lastStopTime = scheduleState.getLastStopTime(); + final ReportingTask reportingTask = taskNode.getReportingTask(); + // Continually attempt to start the Reporting Task, and if we fail sleep for a bit each time. while (true) { - final ReportingTask reportingTask = taskNode.getReportingTask(); - try { - try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, OnConfigured.class, reportingTask, taskNode.getConfigurationContext()); - } + synchronized (scheduleState) { + // if no longer scheduled to run, then we're finished. This can happen, for example, + // if the @OnScheduled method throws an Exception and the user stops the reporting task + // while we're administratively yielded. + // we also check if the schedule state's last start time is equal to what it was before. + // if not, then means that the reporting task has been stopped and started again, so we should just + // bail; another thread will be responsible for invoking the @OnScheduled methods. + if (!scheduleState.isScheduled() || scheduleState.getLastStopTime() != lastStopTime) { + return; + } - break; + try (final NarCloseable x = NarCloseable.withNarLoader()) { + ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, OnConfigured.class, reportingTask, taskNode.getConfigurationContext()); + } + + agent.schedule(taskNode, scheduleState); + return; + } } catch (final Exception e) { final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask); componentLog.error("Failed to invoke @OnEnabled method due to {}", cause); - LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", - new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e); + LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this " + + "ReportingTask and will attempt to schedule it again after {}", + new Object[] { reportingTask, e.toString(), administrativeYieldDuration }, e); + try { Thread.sleep(administrativeYieldMillis); } catch (final InterruptedException ie) { } } } - - agent.schedule(taskNode, scheduleState); } }; @@ -216,7 +230,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { taskNode.verifyCanStop(); final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy()); final ReportingTask reportingTask = taskNode.getReportingTask(); - scheduleState.setScheduled(false); taskNode.setScheduledState(ScheduledState.STOPPED); final Runnable unscheduleReportingTaskRunnable = new Runnable() { @@ -225,29 +238,33 @@ public final class StandardProcessScheduler implements ProcessScheduler { public void run() { final ConfigurationContext configurationContext = taskNode.getConfigurationContext(); - try { - try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.invokeMethodsWithAnnotations(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext); - } - } catch (final Exception e) { - final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; - final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask); - componentLog.error("Failed to invoke @OnUnscheduled method due to {}", cause); - - LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", - reportingTask, cause.toString(), administrativeYieldDuration); - LOG.error("", cause); + synchronized (scheduleState) { + scheduleState.setScheduled(false); try { - Thread.sleep(administrativeYieldMillis); - } catch (final InterruptedException ie) { + try (final NarCloseable x = NarCloseable.withNarLoader()) { + ReflectionUtils.invokeMethodsWithAnnotations(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext); + } + } catch (final Exception e) { + final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; + final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask); + componentLog.error("Failed to invoke @OnUnscheduled method due to {}", cause); + + LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", + reportingTask, cause.toString(), administrativeYieldDuration); + LOG.error("", cause); + + try { + Thread.sleep(administrativeYieldMillis); + } catch (final InterruptedException ie) { + } } - } - agent.unschedule(taskNode, scheduleState); + agent.unschedule(taskNode, scheduleState); - if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext); + if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) { + ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext); + } } } }; @@ -694,6 +711,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { } } } + } }; diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java new file mode 100644 index 0000000000..72eaa84531 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.scheduling; + +import static org.junit.Assert.assertTrue; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.controller.Heartbeater; +import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.ValidationContextFactory; +import org.apache.nifi.controller.reporting.StandardReportingInitializationContext; +import org.apache.nifi.controller.reporting.StandardReportingTaskNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.StandardValidationContextFactory; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.scheduling.SchedulingStrategy; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestStandardProcessScheduler { + private StandardProcessScheduler scheduler = null; + private ReportingTaskNode taskNode = null; + private TestReportingTask reportingTask = null; + + @Before + public void setup() throws InitializationException { + System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties"); + scheduler = new StandardProcessScheduler(Mockito.mock(Heartbeater.class), Mockito.mock(ControllerServiceProvider.class), null); + scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, Mockito.mock(SchedulingAgent.class)); + + reportingTask = new TestReportingTask(); + final ReportingInitializationContext config = new StandardReportingInitializationContext(UUID.randomUUID().toString(), "Test", SchedulingStrategy.TIMER_DRIVEN, "5 secs", + Mockito.mock(ComponentLog.class), null); + reportingTask.initialize(config); + + final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null); + taskNode = new StandardReportingTaskNode(reportingTask, UUID.randomUUID().toString(), null, scheduler, validationContextFactory); + } + + /** + * We have run into an issue where a Reporting Task is scheduled to run but throws an Exception + * from a method with the @OnScheduled annotation. User stops Reporting Task, updates configuration + * to fix the issue. Reporting Task then finishes running @OnSchedule method and is then scheduled to run. + * This unit test is intended to verify that we have this resolved. + */ + @Test + public void testReportingTaskDoesntKeepRunningAfterStop() throws InterruptedException, InitializationException { + scheduler.schedule(taskNode); + + // Let it try to run a few times. + Thread.sleep(1000L); + + scheduler.unschedule(taskNode); + + final int attempts = reportingTask.onScheduleAttempts.get(); + // give it a sec to make sure that it's finished running. + Thread.sleep(1500L); + final int attemptsAfterStop = reportingTask.onScheduleAttempts.get() - attempts; + + // allow 1 extra run, due to timing issues that could call it as it's being stopped. + assertTrue("After unscheduling Reporting Task, task ran an additional " + attemptsAfterStop + " times", attemptsAfterStop <= 1); + } + + + private class TestReportingTask extends AbstractReportingTask { + private final AtomicBoolean failOnScheduled = new AtomicBoolean(true); + private final AtomicInteger onScheduleAttempts = new AtomicInteger(0); + private final AtomicInteger triggerCount = new AtomicInteger(0); + + @OnScheduled + public void onScheduled() { + onScheduleAttempts.incrementAndGet(); + + if (failOnScheduled.get()) { + throw new RuntimeException("Intentional Exception for testing purposes"); + } + } + + @Override + public void onTrigger(final ReportingContext context) { + triggerCount.getAndIncrement(); + } + } +} diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties index e7ea9b62a2..5d3344deb7 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi.properties @@ -20,7 +20,7 @@ nifi.flow.configuration.archive.dir=./target/archive/ nifi.flowcontroller.autoResumeState=true nifi.flowcontroller.graceful.shutdown.period=10 sec nifi.flowservice.writedelay.interval=2 sec -nifi.administrative.yield.duration=30 sec +nifi.administrative.yield.duration=500 millis nifi.reporting.task.configuration.file=./target/reporting-tasks.xml nifi.controller.service.configuration.file=./target/controller-services.xml