diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java index e385921d04..9ec1b071a3 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java @@ -28,7 +28,7 @@ import org.apache.flume.conf.Configurables; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; @@ -124,15 +124,14 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor { } } - @OnUnscheduled - public void unScheduled() { + @OnStopped + public void stopped() { sink.stop(); channel.stop(); } @Override - public void onTrigger(final ProcessContext context, - final ProcessSession session) throws ProcessException { + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { channel.setSession(session); try { diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java index 3ded2085b3..1ebf05cfa9 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java @@ -33,7 +33,6 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; @@ -93,7 +92,6 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor { private final AtomicReference sessionFactoryRef = new AtomicReference<>(null); private final AtomicReference runnerRef = new AtomicReference<>(null); private final AtomicReference eventDrivenSourceChannelRef = new AtomicReference<>(null); - private final AtomicReference stopping = new AtomicReference<>(false); @Override protected void init(final ProcessorInitializationContext context) { @@ -114,7 +112,6 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor { @OnScheduled public void onScheduled(final SchedulingContext context) { try { - stopping.set(false); source = SOURCE_FACTORY.create( context.getProperty(SOURCE_NAME) .getValue(), @@ -142,9 +139,8 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor { } } - @OnUnscheduled - public void unScheduled() { - stopping.set(true); + @OnStopped + public void stopped() { if (source instanceof PollableSource) { source.stop(); } else { @@ -160,10 +156,6 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor { eventDrivenSourceChannelRef.compareAndSet(eventDrivenSourceChannel, null); } } - } - - @OnStopped - public void stopped() { sessionFactoryRef.set(null); } @@ -176,14 +168,11 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor { if (old == null) { runnerRef.set(new EventDrivenSourceRunner()); eventDrivenSourceChannelRef.set(new NifiSessionFactoryChannel(sessionFactoryRef.get(), SUCCESS)); - eventDrivenSourceChannelRef.get() - .start(); - source.setChannelProcessor(new ChannelProcessor(new NifiChannelSelector( - eventDrivenSourceChannelRef.get()))); - runnerRef.get() - .setSource(source); - runnerRef.get() - .start(); + eventDrivenSourceChannelRef.get().start(); + source.setChannelProcessor(new ChannelProcessor( + new NifiChannelSelector(eventDrivenSourceChannelRef.get()))); + runnerRef.get().setSource(source); + runnerRef.get().start(); } } } diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java index 32feb1e28a..bf32095f7b 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.flume.sink.NullSink; import org.apache.flume.source.AvroSource; @@ -126,9 +127,24 @@ public class FlumeSourceProcessorTest { runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "spooldir"); runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG, "tier1.sources.src-1.spoolDir = " + spoolDirectory.getAbsolutePath()); - runner.run(); - // No data will be transfered because of how quickly the test runner - // starts shutting down - runner.assertTransferCount(FlumeSourceProcessor.SUCCESS, 0); + runner.run(1, false, true); + // Because the spool directory source is an event driven source, it may take some time for flow files to get + // produced. I'm willing to wait up to 5 seconds, but will bail out early if possible. If it takes longer than + // that then there is likely a bug. + int numWaits = 10; + while (runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS).size() < 4 && --numWaits > 0) { + try { + TimeUnit.MILLISECONDS.sleep(500); + } catch (InterruptedException ex) { + logger.warn("Sleep interrupted"); + } + } + runner.shutdown(); + runner.assertTransferCount(FlumeSourceProcessor.SUCCESS, 4); + int i = 1; + for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS)) { + flowFile.assertContentEquals("record " + i); + i++; + } } }