NIFI-720: Ensure that if Reporting Task stopped while @OnScheduled method is failing that it does not start running when configuration is fixed

This commit is contained in:
Mark Payne 2015-06-23 16:23:01 -04:00
parent 2a4e5e1285
commit 8da7327188
3 changed files with 155 additions and 30 deletions

View File

@ -174,31 +174,45 @@ public final class StandardProcessScheduler implements ProcessScheduler {
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Override @Override
public void run() { public void run() {
final long lastStopTime = scheduleState.getLastStopTime();
final ReportingTask reportingTask = taskNode.getReportingTask();
// Continually attempt to start the Reporting Task, and if we fail sleep for a bit each time. // Continually attempt to start the Reporting Task, and if we fail sleep for a bit each time.
while (true) { while (true) {
final ReportingTask reportingTask = taskNode.getReportingTask();
try { try {
try (final NarCloseable x = NarCloseable.withNarLoader()) { synchronized (scheduleState) {
ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, OnConfigured.class, reportingTask, taskNode.getConfigurationContext()); // if no longer scheduled to run, then we're finished. This can happen, for example,
} // if the @OnScheduled method throws an Exception and the user stops the reporting task
// while we're administratively yielded.
// we also check if the schedule state's last start time is equal to what it was before.
// if not, then means that the reporting task has been stopped and started again, so we should just
// bail; another thread will be responsible for invoking the @OnScheduled methods.
if (!scheduleState.isScheduled() || scheduleState.getLastStopTime() != lastStopTime) {
return;
}
break; try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, OnConfigured.class, reportingTask, taskNode.getConfigurationContext());
}
agent.schedule(taskNode, scheduleState);
return;
}
} catch (final Exception e) { } catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask); final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause); componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}", LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this "
new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e); + "ReportingTask and will attempt to schedule it again after {}",
new Object[] { reportingTask, e.toString(), administrativeYieldDuration }, e);
try { try {
Thread.sleep(administrativeYieldMillis); Thread.sleep(administrativeYieldMillis);
} catch (final InterruptedException ie) { } catch (final InterruptedException ie) {
} }
} }
} }
agent.schedule(taskNode, scheduleState);
} }
}; };
@ -216,7 +230,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
taskNode.verifyCanStop(); taskNode.verifyCanStop();
final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy()); final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy());
final ReportingTask reportingTask = taskNode.getReportingTask(); final ReportingTask reportingTask = taskNode.getReportingTask();
scheduleState.setScheduled(false);
taskNode.setScheduledState(ScheduledState.STOPPED); taskNode.setScheduledState(ScheduledState.STOPPED);
final Runnable unscheduleReportingTaskRunnable = new Runnable() { final Runnable unscheduleReportingTaskRunnable = new Runnable() {
@ -225,29 +238,33 @@ public final class StandardProcessScheduler implements ProcessScheduler {
public void run() { public void run() {
final ConfigurationContext configurationContext = taskNode.getConfigurationContext(); final ConfigurationContext configurationContext = taskNode.getConfigurationContext();
try { synchronized (scheduleState) {
try (final NarCloseable x = NarCloseable.withNarLoader()) { scheduleState.setScheduled(false);
ReflectionUtils.invokeMethodsWithAnnotations(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
}
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
componentLog.error("Failed to invoke @OnUnscheduled method due to {}", cause);
LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
reportingTask, cause.toString(), administrativeYieldDuration);
LOG.error("", cause);
try { try {
Thread.sleep(administrativeYieldMillis); try (final NarCloseable x = NarCloseable.withNarLoader()) {
} catch (final InterruptedException ie) { ReflectionUtils.invokeMethodsWithAnnotations(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
}
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
componentLog.error("Failed to invoke @OnUnscheduled method due to {}", cause);
LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
reportingTask, cause.toString(), administrativeYieldDuration);
LOG.error("", cause);
try {
Thread.sleep(administrativeYieldMillis);
} catch (final InterruptedException ie) {
}
} }
}
agent.unschedule(taskNode, scheduleState); agent.unschedule(taskNode, scheduleState);
if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) { if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext); ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext);
}
} }
} }
}; };
@ -694,6 +711,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
} }
} }
} }
} }
}; };

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.scheduling;
import static org.junit.Assert.assertTrue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestStandardProcessScheduler {
private StandardProcessScheduler scheduler = null;
private ReportingTaskNode taskNode = null;
private TestReportingTask reportingTask = null;
@Before
public void setup() throws InitializationException {
System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
scheduler = new StandardProcessScheduler(Mockito.mock(Heartbeater.class), Mockito.mock(ControllerServiceProvider.class), null);
scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, Mockito.mock(SchedulingAgent.class));
reportingTask = new TestReportingTask();
final ReportingInitializationContext config = new StandardReportingInitializationContext(UUID.randomUUID().toString(), "Test", SchedulingStrategy.TIMER_DRIVEN, "5 secs",
Mockito.mock(ComponentLog.class), null);
reportingTask.initialize(config);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null);
taskNode = new StandardReportingTaskNode(reportingTask, UUID.randomUUID().toString(), null, scheduler, validationContextFactory);
}
/**
* We have run into an issue where a Reporting Task is scheduled to run but throws an Exception
* from a method with the @OnScheduled annotation. User stops Reporting Task, updates configuration
* to fix the issue. Reporting Task then finishes running @OnSchedule method and is then scheduled to run.
* This unit test is intended to verify that we have this resolved.
*/
@Test
public void testReportingTaskDoesntKeepRunningAfterStop() throws InterruptedException, InitializationException {
scheduler.schedule(taskNode);
// Let it try to run a few times.
Thread.sleep(1000L);
scheduler.unschedule(taskNode);
final int attempts = reportingTask.onScheduleAttempts.get();
// give it a sec to make sure that it's finished running.
Thread.sleep(1500L);
final int attemptsAfterStop = reportingTask.onScheduleAttempts.get() - attempts;
// allow 1 extra run, due to timing issues that could call it as it's being stopped.
assertTrue("After unscheduling Reporting Task, task ran an additional " + attemptsAfterStop + " times", attemptsAfterStop <= 1);
}
private class TestReportingTask extends AbstractReportingTask {
private final AtomicBoolean failOnScheduled = new AtomicBoolean(true);
private final AtomicInteger onScheduleAttempts = new AtomicInteger(0);
private final AtomicInteger triggerCount = new AtomicInteger(0);
@OnScheduled
public void onScheduled() {
onScheduleAttempts.incrementAndGet();
if (failOnScheduled.get()) {
throw new RuntimeException("Intentional Exception for testing purposes");
}
}
@Override
public void onTrigger(final ReportingContext context) {
triggerCount.getAndIncrement();
}
}
}

View File

@ -20,7 +20,7 @@ nifi.flow.configuration.archive.dir=./target/archive/
nifi.flowcontroller.autoResumeState=true nifi.flowcontroller.autoResumeState=true
nifi.flowcontroller.graceful.shutdown.period=10 sec nifi.flowcontroller.graceful.shutdown.period=10 sec
nifi.flowservice.writedelay.interval=2 sec nifi.flowservice.writedelay.interval=2 sec
nifi.administrative.yield.duration=30 sec nifi.administrative.yield.duration=500 millis
nifi.reporting.task.configuration.file=./target/reporting-tasks.xml nifi.reporting.task.configuration.file=./target/reporting-tasks.xml
nifi.controller.service.configuration.file=./target/controller-services.xml nifi.controller.service.configuration.file=./target/controller-services.xml