NIFI-11392 for CRON scheduled components, improved cancellation of … (#7232)

NIFI-11392 for CRON scheduled components, improved cancellation of futures for thread cleanup
This commit is contained in:
Michael Moser 2023-05-30 11:09:06 -04:00 committed by GitHub
parent 3bd4b49abe
commit eca4f5d68f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 33 additions and 42 deletions

View File

@ -25,16 +25,15 @@ import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.exception.ProcessException;
import org.quartz.CronExpression;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class QuartzSchedulingAgent extends AbstractTimeBasedSchedulingAgent {
private final Map<Object, List<AtomicBoolean>> canceledTriggers = new HashMap<>();
private final Map<Object, Map<Integer, ScheduledFuture<?>>> quartzFutures = new HashMap<>();
public QuartzSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final RepositoryContextFactory contextFactory) {
super(flowEngine, flowController, contextFactory);
@ -42,12 +41,19 @@ public class QuartzSchedulingAgent extends AbstractTimeBasedSchedulingAgent {
@Override
public void shutdown() {
quartzFutures.values().forEach(map -> map.values().forEach(future -> {
if (!future.isCancelled()) {
// stop scheduling to run and interrupt currently running tasks.
future.cancel(true);
}
}));
flowEngine.shutdown();
}
@Override
public void doSchedule(final ReportingTaskNode taskNode, final LifecycleState scheduleState) {
final List<AtomicBoolean> existingTriggers = canceledTriggers.get(taskNode);
if (existingTriggers != null) {
final Map<Integer, ScheduledFuture<?>> componentFuturesMap = quartzFutures.computeIfAbsent(taskNode, k -> new HashMap<>());
if (!componentFuturesMap.values().isEmpty()) {
throw new IllegalStateException("Cannot schedule " + taskNode.getReportingTask().getIdentifier() + " because it is already scheduled to run");
}
@ -61,7 +67,6 @@ public class QuartzSchedulingAgent extends AbstractTimeBasedSchedulingAgent {
final ReportingTaskWrapper taskWrapper = new ReportingTaskWrapper(taskNode, scheduleState, flowController.getExtensionManager());
final AtomicBoolean canceled = new AtomicBoolean(false);
final Date initialDate = cronExpression.getTimeAfter(new Date());
final long initialDelay = initialDate.getTime() - System.currentTimeMillis();
@ -71,37 +76,30 @@ public class QuartzSchedulingAgent extends AbstractTimeBasedSchedulingAgent {
@Override
public void run() {
if (canceled.get()) {
return;
}
taskWrapper.run();
if (canceled.get()) {
return;
}
nextSchedule = getNextSchedule(nextSchedule, cronExpression);
final long delay = getDelay(nextSchedule);
logger.debug("Finished running Reporting Task {}; next scheduled time is at {} after a delay of {} milliseconds", taskNode, nextSchedule, delay);
flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
final ScheduledFuture<?> newFuture = flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
final ScheduledFuture<?> oldFuture = componentFuturesMap.put(0, newFuture);
scheduleState.replaceFuture(oldFuture, newFuture);
}
};
final List<AtomicBoolean> triggers = new ArrayList<>(1);
triggers.add(canceled);
canceledTriggers.put(taskNode, triggers);
final ScheduledFuture<?> future = flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS);
componentFuturesMap.put(0, future);
flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS);
scheduleState.setScheduled(true);
scheduleState.setFutures(componentFuturesMap.values());
logger.info("Scheduled Reporting Task {} to run threads on schedule {}", taskNode, cronSchedule);
}
@Override
public synchronized void doSchedule(final Connectable connectable, final LifecycleState scheduleState) {
final List<AtomicBoolean> existingTriggers = canceledTriggers.get(connectable);
if (existingTriggers != null) {
final Map<Integer, ScheduledFuture<?>> componentFuturesMap = quartzFutures.computeIfAbsent(connectable, k -> new HashMap<>());
if (!componentFuturesMap.values().isEmpty()) {
throw new IllegalStateException("Cannot schedule " + connectable + " because it is already scheduled to run");
}
@ -114,11 +112,10 @@ public class QuartzSchedulingAgent extends AbstractTimeBasedSchedulingAgent {
throw new IllegalStateException("Cannot schedule " + connectable + " to run because its scheduling period is not valid");
}
final List<AtomicBoolean> triggers = new ArrayList<>();
for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) {
final ConnectableTask continuallyRunTask = new ConnectableTask(this, connectable, flowController, contextFactory, scheduleState);
final AtomicBoolean canceled = new AtomicBoolean(false);
final AtomicInteger taskNumber = new AtomicInteger(i);
final Date initialDate = cronExpression.getTimeAfter(new Date());
final long initialDelay = initialDate.getTime() - System.currentTimeMillis();
@ -129,10 +126,6 @@ public class QuartzSchedulingAgent extends AbstractTimeBasedSchedulingAgent {
@Override
public void run() {
if (canceled.get()) {
return;
}
try {
continuallyRunTask.invoke();
} catch (final RuntimeException re) {
@ -141,24 +134,21 @@ public class QuartzSchedulingAgent extends AbstractTimeBasedSchedulingAgent {
throw new ProcessException(e);
}
if (canceled.get()) {
return;
}
nextSchedule = getNextSchedule(nextSchedule, cronExpression);
final long delay = getDelay(nextSchedule);
logger.debug("Finished task for {}; next scheduled time is at {} after a delay of {} milliseconds", connectable, nextSchedule, delay);
flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
final ScheduledFuture<?> newFuture = flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
final ScheduledFuture<?> oldFuture = componentFuturesMap.put(taskNumber.get(), newFuture);
scheduleState.replaceFuture(oldFuture, newFuture);
}
};
flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS);
triggers.add(canceled);
final ScheduledFuture<?> future = flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS);
componentFuturesMap.put(taskNumber.get(), future);
}
canceledTriggers.put(connectable, triggers);
scheduleState.setFutures(componentFuturesMap.values());
logger.info("Scheduled {} to run with {} threads on schedule {}", connectable, connectable.getMaxConcurrentTasks(), cronSchedule);
}
@ -173,12 +163,13 @@ public class QuartzSchedulingAgent extends AbstractTimeBasedSchedulingAgent {
}
private void unschedule(final Object scheduled, final LifecycleState scheduleState) {
final List<AtomicBoolean> triggers = canceledTriggers.remove(scheduled);
if (triggers != null) {
for (final AtomicBoolean trigger : triggers) {
trigger.set(true);
quartzFutures.remove(scheduled);
scheduleState.getFutures().forEach(future -> {
if (!future.isCancelled()) {
// stop scheduling to run but do not interrupt currently running tasks.
future.cancel(false);
}
}
});
scheduleState.setScheduled(false);
logger.info("Stopped scheduling {} to run", scheduled);