From 16134a2dfedf2f05f7d798082bfd86a5a13bda5a Mon Sep 17 00:00:00 2001 From: Joey Echeverria Date: Wed, 10 Jun 2015 15:08:03 -0700 Subject: [PATCH] More review feedback from Ryan: * Removed unnecessary call to configure our channel * Removed call to context.yield() when Flume reports a backoff * Handled the session factory changing when using a event driven source. Signed-off-by: Matt Gilman --- .../apache/nifi/processors/flume/FlumeSinkProcessor.java | 5 +---- .../apache/nifi/processors/flume/FlumeSourceProcessor.java | 6 +++++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java index 9ec1b071a3..2d8506d578 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java @@ -104,7 +104,6 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor { public void onScheduled(final SchedulingContext context) { try { channel = new NifiSinkSessionChannel(SUCCESS, FAILURE); - Configurables.configure(channel, new Context()); channel.start(); sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(), @@ -135,9 +134,7 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor { channel.setSession(session); try { - if (sink.process() == Sink.Status.BACKOFF) { - context.yield(); - } + sink.process(); } catch (EventDeliveryException ex) { throw new ProcessException("Flume event delivery failed", ex); } diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java index 1ebf05cfa9..55b1f2f8ec 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java @@ -165,7 +165,11 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor { super.onTrigger(context, sessionFactory); } else if (source instanceof EventDrivenSource) { ProcessSessionFactory old = sessionFactoryRef.getAndSet(sessionFactory); - if (old == null) { + if (old != sessionFactory) { + if (runnerRef.get() != null) { + stopped(); + } + runnerRef.set(new EventDrivenSourceRunner()); eventDrivenSourceChannelRef.set(new NifiSessionFactoryChannel(sessionFactoryRef.get(), SUCCESS)); eventDrivenSourceChannelRef.get().start();