diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 655f2df5d9..89385477d7 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -39,6 +39,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -182,6 +183,11 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public void run(final int iterations, final boolean stopOnFinish, final boolean initialize) { + run(iterations, stopOnFinish, initialize, 5000); + } + + @Override + public void run(final int iterations, final boolean stopOnFinish, final boolean initialize, final long runWait) { if (iterations < 1) { throw new IllegalArgumentException(); } @@ -207,6 +213,10 @@ public class StandardProcessorTestRunner implements TestRunner { } executorService.shutdown(); + try { + executorService.awaitTermination(runWait, TimeUnit.MILLISECONDS); + } catch (InterruptedException e1) { + } int finishedCount = 0; boolean unscheduledRun = false; diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index a599e5bbdd..fb9fc78600 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -125,6 +125,51 @@ public interface TestRunner { */ void run(int iterations, boolean stopOnFinish, final boolean initialize); + /** + * This method runs the {@link Processor} iterations times, + * using the sequence of steps below: + * + * + * @param iterations number of iterations + * @param stopOnFinish whether or not to run the Processor methods that are + * annotated with {@link nifi.processor.annotation.OnStopped @OnStopped} + * @param initialize true if must initialize + * @param runWait indicates the amount of time in milliseconds that the framework should wait for + * processors to stop running before calling the {@link nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation + */ + void run(int iterations, boolean stopOnFinish, final boolean initialize, final long runWait); + /** * Invokes all methods on the Processor that are annotated with the * {@link nifi.processor.annotation.OnShutdown @OnShutdown} annotation. If diff --git a/nifi/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java b/nifi/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java new file mode 100644 index 0000000000..6b403af25c --- /dev/null +++ b/nifi/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.junit.Assert; +import org.junit.Test; + +public class CurrentTestStandardProcessorTestRunner { + + /** + * This test will verify that all iterations of the run are finished before unscheduled is called + */ + @Test + public void testOnScheduledCalledAfterRunFinished() { + SlowRunProcessor processor = new SlowRunProcessor(); + StandardProcessorTestRunner runner = new StandardProcessorTestRunner(processor); + final int iterations = 5; + runner.run(iterations); + // if the counter is not equal to iterations, the the processor must have been unscheduled + // before all the run calls were made, that would be bad. + Assert.assertEquals(iterations, processor.getCounter()); + } + + /** + * This processor simulates a "slow" processor that checks whether it is scheduled before doing something + * + * + */ + private static class SlowRunProcessor extends AbstractProcessor { + + private int counter = 0; + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + try { + // be slow + Thread.sleep(50); + // make sure we are still scheduled + if (isScheduled()) { + // increment counter + ++counter; + } + } catch (InterruptedException e) { + } + + } + + public int getCounter() { + return counter; + } + } +}