NIFI-9227 Run Once not working when scheduling strategy is CRON or Event driven (#5445)

This commit is contained in:
Hsin-Ying Lee 2022-02-25 01:27:01 +08:00 committed by GitHub
parent 39483b9c12
commit c331f9cb9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 7 deletions

View File

@ -174,12 +174,10 @@ public class QuartzSchedulingAgent extends AbstractTimeBasedSchedulingAgent {
private void unschedule(final Object scheduled, final LifecycleState scheduleState) {
final List<AtomicBoolean> triggers = canceledTriggers.remove(scheduled);
if (triggers == null) {
throw new IllegalStateException("Cannot unschedule " + scheduled + " because it was not scheduled to run");
}
for (final AtomicBoolean trigger : triggers) {
trigger.set(true);
if (triggers != null) {
for (final AtomicBoolean trigger : triggers) {
trigger.set(true);
}
}
scheduleState.setScheduled(false);

View File

@ -416,6 +416,12 @@ public class NiFiClientUtil {
return updateProcessorConfig(currentEntity, config);
}
public ProcessorEntity updateProcessorSchedulingStrategy(final ProcessorEntity currentEntity, final String schedulingStrategy) throws NiFiClientException, IOException {
final ProcessorConfigDTO config = new ProcessorConfigDTO();
config.setSchedulingStrategy(schedulingStrategy);
return updateProcessorConfig(currentEntity, config);
}
public ProcessorEntity updateProcessorConfig(final ProcessorEntity currentEntity, final ProcessorConfigDTO config) throws NiFiClientException, IOException {
final ProcessorDTO processorDto = new ProcessorDTO();
processorDto.setConfig(config);

View File

@ -28,7 +28,7 @@ import static org.junit.Assert.assertEquals;
public class RunOnceIT extends NiFiSystemIT {
@Test
@Test(timeout = 10000L)
public void testRunOnce() throws NiFiClientException, IOException, InterruptedException {
ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
getClientUtil().updateProcessorSchedulingPeriod(generate, "1 sec");
@ -46,5 +46,18 @@ public class RunOnceIT extends NiFiSystemIT {
assertEquals("Stopped", actualRunStatus);
assertEquals(1, getConnectionQueueSize(generateToTerminate.getId()));
// Test CRON_DRIVEN Strategy
getClientUtil().updateProcessorSchedulingStrategy(generate, "CRON_DRIVEN");
getClientUtil().updateProcessorSchedulingPeriod(generate, "* * * * * ?");
getNifiClient().getProcessorClient().runProcessorOnce(generate);
waitForQueueCount(generateToTerminate.getId(), 2);
actualRunStatus = actualGenerate.getStatus().getRunStatus();
assertEquals("Stopped", actualRunStatus);
assertEquals(2, getConnectionQueueSize(generateToTerminate.getId()));
}
}