Moved the source/sink stoppign to @OnStopped methods

* Made the spoolDirectory test stronger

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
Joey Echeverria 2015-06-10 14:18:42 -07:00 committed by Matt Gilman
parent 3af73c9b82
commit c4dd1e65b1
3 changed files with 31 additions and 27 deletions

View File

@ -28,7 +28,7 @@ import org.apache.flume.conf.Configurables;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
@ -124,15 +124,14 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor {
} }
} }
@OnUnscheduled @OnStopped
public void unScheduled() { public void stopped() {
sink.stop(); sink.stop();
channel.stop(); channel.stop();
} }
@Override @Override
public void onTrigger(final ProcessContext context, public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ProcessSession session) throws ProcessException {
channel.setSession(session); channel.setSession(session);
try { try {

View File

@ -33,7 +33,6 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
@ -93,7 +92,6 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor {
private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference<>(null); private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference<>(null);
private final AtomicReference<EventDrivenSourceRunner> runnerRef = new AtomicReference<>(null); private final AtomicReference<EventDrivenSourceRunner> runnerRef = new AtomicReference<>(null);
private final AtomicReference<NifiSessionFactoryChannel> eventDrivenSourceChannelRef = new AtomicReference<>(null); private final AtomicReference<NifiSessionFactoryChannel> eventDrivenSourceChannelRef = new AtomicReference<>(null);
private final AtomicReference<Boolean> stopping = new AtomicReference<>(false);
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
@ -114,7 +112,6 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor {
@OnScheduled @OnScheduled
public void onScheduled(final SchedulingContext context) { public void onScheduled(final SchedulingContext context) {
try { try {
stopping.set(false);
source = SOURCE_FACTORY.create( source = SOURCE_FACTORY.create(
context.getProperty(SOURCE_NAME) context.getProperty(SOURCE_NAME)
.getValue(), .getValue(),
@ -142,9 +139,8 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor {
} }
} }
@OnUnscheduled @OnStopped
public void unScheduled() { public void stopped() {
stopping.set(true);
if (source instanceof PollableSource) { if (source instanceof PollableSource) {
source.stop(); source.stop();
} else { } else {
@ -160,10 +156,6 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor {
eventDrivenSourceChannelRef.compareAndSet(eventDrivenSourceChannel, null); eventDrivenSourceChannelRef.compareAndSet(eventDrivenSourceChannel, null);
} }
} }
}
@OnStopped
public void stopped() {
sessionFactoryRef.set(null); sessionFactoryRef.set(null);
} }
@ -176,14 +168,11 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor {
if (old == null) { if (old == null) {
runnerRef.set(new EventDrivenSourceRunner()); runnerRef.set(new EventDrivenSourceRunner());
eventDrivenSourceChannelRef.set(new NifiSessionFactoryChannel(sessionFactoryRef.get(), SUCCESS)); eventDrivenSourceChannelRef.set(new NifiSessionFactoryChannel(sessionFactoryRef.get(), SUCCESS));
eventDrivenSourceChannelRef.get() eventDrivenSourceChannelRef.get().start();
.start(); source.setChannelProcessor(new ChannelProcessor(
source.setChannelProcessor(new ChannelProcessor(new NifiChannelSelector( new NifiChannelSelector(eventDrivenSourceChannelRef.get())));
eventDrivenSourceChannelRef.get()))); runnerRef.get().setSource(source);
runnerRef.get() runnerRef.get().start();
.setSource(source);
runnerRef.get()
.start();
} }
} }
} }

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flume.sink.NullSink; import org.apache.flume.sink.NullSink;
import org.apache.flume.source.AvroSource; import org.apache.flume.source.AvroSource;
@ -126,9 +127,24 @@ public class FlumeSourceProcessorTest {
runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "spooldir"); runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "spooldir");
runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG, runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
"tier1.sources.src-1.spoolDir = " + spoolDirectory.getAbsolutePath()); "tier1.sources.src-1.spoolDir = " + spoolDirectory.getAbsolutePath());
runner.run(); runner.run(1, false, true);
// No data will be transfered because of how quickly the test runner // Because the spool directory source is an event driven source, it may take some time for flow files to get
// starts shutting down // produced. I'm willing to wait up to 5 seconds, but will bail out early if possible. If it takes longer than
runner.assertTransferCount(FlumeSourceProcessor.SUCCESS, 0); // that then there is likely a bug.
int numWaits = 10;
while (runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS).size() < 4 && --numWaits > 0) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ex) {
logger.warn("Sleep interrupted");
}
}
runner.shutdown();
runner.assertTransferCount(FlumeSourceProcessor.SUCCESS, 4);
int i = 1;
for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS)) {
flowFile.assertContentEquals("record " + i);
i++;
}
} }
} }