NIFI-704 StandardProcessorTestRunner should allow you to wait before calling OnUnScheduled methods

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
danbress 2015-06-20 19:53:13 -04:00 committed by Mark Payne
parent f2f9056055
commit 5d8bfa7c80
3 changed files with 126 additions and 0 deletions

View File

@ -39,6 +39,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -182,6 +183,11 @@ public class StandardProcessorTestRunner implements TestRunner {
@Override @Override
public void run(final int iterations, final boolean stopOnFinish, final boolean initialize) { public void run(final int iterations, final boolean stopOnFinish, final boolean initialize) {
run(iterations, stopOnFinish, initialize, 5000);
}
@Override
public void run(final int iterations, final boolean stopOnFinish, final boolean initialize, final long runWait) {
if (iterations < 1) { if (iterations < 1) {
throw new IllegalArgumentException(); throw new IllegalArgumentException();
} }
@ -207,6 +213,10 @@ public class StandardProcessorTestRunner implements TestRunner {
} }
executorService.shutdown(); executorService.shutdown();
try {
executorService.awaitTermination(runWait, TimeUnit.MILLISECONDS);
} catch (InterruptedException e1) {
}
int finishedCount = 0; int finishedCount = 0;
boolean unscheduledRun = false; boolean unscheduledRun = false;

View File

@ -125,6 +125,51 @@ public interface TestRunner {
*/ */
void run(int iterations, boolean stopOnFinish, final boolean initialize); void run(int iterations, boolean stopOnFinish, final boolean initialize);
/**
* This method runs the {@link Processor} <code>iterations</code> times,
* using the sequence of steps below:
* <ul>
* <li>
* If {@code initialize} is true, run all methods on the Processor that are
* annotated with the
* {@link nifi.processor.annotation.OnScheduled @OnScheduled} annotation. If
* any of these methods throws an Exception, the Unit Test will fail.
* </li>
* <li>
* Schedule the
* {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger}
* method to be invoked <code>iterations</code> times. The number of threads
* used to run these iterations is determined by the ThreadCount of this
* <code>TestRunner</code>. By default, the value is set to 1, but it can be
* modified by calling the {@link #setThreadCount(int)} method.
* </li>
* <li>
* As soon as the first thread finishes its execution of
* {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger},
* all methods on the Processor that are annotated with the
* {@link nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation
* are invoked. If any of these methods throws an Exception, the Unit Test
* will fail.
* </li>
* <li>
* Waits for all threads to finish execution.
* </li>
* <li>
* If and only if the value of <code>shutdown</code> is true: Call all
* methods on the Processor that is annotated with the
* {@link nifi.processor.annotation.OnStopped @OnStopped} annotation.
* </li>
* </ul>
*
* @param iterations number of iterations
* @param stopOnFinish whether or not to run the Processor methods that are
* annotated with {@link nifi.processor.annotation.OnStopped @OnStopped}
* @param initialize true if must initialize
* @param runWait indicates the amount of time in milliseconds that the framework should wait for
* processors to stop running before calling the {@link nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation
*/
void run(int iterations, boolean stopOnFinish, final boolean initialize, final long runWait);
/** /**
* Invokes all methods on the Processor that are annotated with the * Invokes all methods on the Processor that are annotated with the
* {@link nifi.processor.annotation.OnShutdown @OnShutdown} annotation. If * {@link nifi.processor.annotation.OnShutdown @OnShutdown} annotation. If

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.junit.Assert;
import org.junit.Test;
public class CurrentTestStandardProcessorTestRunner {
/**
* This test will verify that all iterations of the run are finished before unscheduled is called
*/
@Test
public void testOnScheduledCalledAfterRunFinished() {
SlowRunProcessor processor = new SlowRunProcessor();
StandardProcessorTestRunner runner = new StandardProcessorTestRunner(processor);
final int iterations = 5;
runner.run(iterations);
// if the counter is not equal to iterations, the the processor must have been unscheduled
// before all the run calls were made, that would be bad.
Assert.assertEquals(iterations, processor.getCounter());
}
/**
* This processor simulates a "slow" processor that checks whether it is scheduled before doing something
*
*
*/
private static class SlowRunProcessor extends AbstractProcessor {
private int counter = 0;
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
try {
// be slow
Thread.sleep(50);
// make sure we are still scheduled
if (isScheduled()) {
// increment counter
++counter;
}
} catch (InterruptedException e) {
}
}
public int getCounter() {
return counter;
}
}
}