More review feedback from Ryan:

* Removed unnecessary call to configure our channel
* Removed call to context.yield() when Flume reports a backoff
* Handled the session factory changing when using a event driven source.

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
Joey Echeverria 2015-06-10 15:08:03 -07:00 committed by Matt Gilman
parent c4dd1e65b1
commit 16134a2dfe
2 changed files with 6 additions and 5 deletions

View File

@ -104,7 +104,6 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor {
public void onScheduled(final SchedulingContext context) {
try {
channel = new NifiSinkSessionChannel(SUCCESS, FAILURE);
Configurables.configure(channel, new Context());
channel.start();
sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
@ -135,9 +134,7 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor {
channel.setSession(session);
try {
if (sink.process() == Sink.Status.BACKOFF) {
context.yield();
}
sink.process();
} catch (EventDeliveryException ex) {
throw new ProcessException("Flume event delivery failed", ex);
}

View File

@ -165,7 +165,11 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor {
super.onTrigger(context, sessionFactory);
} else if (source instanceof EventDrivenSource) {
ProcessSessionFactory old = sessionFactoryRef.getAndSet(sessionFactory);
if (old == null) {
if (old != sessionFactory) {
if (runnerRef.get() != null) {
stopped();
}
runnerRef.set(new EventDrivenSourceRunner());
eventDrivenSourceChannelRef.set(new NifiSessionFactoryChannel(sessionFactoryRef.get(), SUCCESS));
eventDrivenSourceChannelRef.get().start();