Updated with Ryan's feedback:

* Moved away from any reliance on MemoryChannels in favor of
  modeling the ProcessSession/Relationship as the channel directly
  in all cases.
* Fixed version numbers in nifi-flume-* pom files.

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
Joey Echeverria 2015-06-10 12:49:50 -07:00 committed by Matt Gilman
parent 419f9455a2
commit 3af73c9b82
15 changed files with 445 additions and 277 deletions

View File

@ -17,10 +17,10 @@
<parent> <parent>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-flume-bundle</artifactId> <artifactId>nifi-flume-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version> <version>0.2.0-incubating-SNAPSHOT</version>
</parent> </parent>
<artifactId>nifi-flume-nar</artifactId> <artifactId>nifi-flume-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version> <version>0.2.0-incubating-SNAPSHOT</version>
<packaging>nar</packaging> <packaging>nar</packaging>
<dependencies> <dependencies>
<dependency> <dependency>

View File

@ -17,7 +17,7 @@
<parent> <parent>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-flume-bundle</artifactId> <artifactId>nifi-flume-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version> <version>0.2.0-incubating-SNAPSHOT</version>
</parent> </parent>
<artifactId>nifi-flume-processors</artifactId> <artifactId>nifi-flume-processors</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>

View File

@ -33,96 +33,128 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.flume.util.FlowFileEvent; import org.apache.nifi.processors.flume.util.FlowFileEvent;
/** /**
* This is a base class that is helpful when building processors interacting * This is a base class that is helpful when building processors interacting with Flume.
* with Flume.
*/ */
public abstract class AbstractFlumeProcessor extends AbstractProcessor { public abstract class AbstractFlumeProcessor extends AbstractSessionFactoryProcessor {
protected static final SourceFactory SOURCE_FACTORY = new DefaultSourceFactory();
protected static final SinkFactory SINK_FACTORY = new DefaultSinkFactory();
protected static Event flowFileToEvent(FlowFile flowFile, ProcessSession session) { protected static final SourceFactory SOURCE_FACTORY = new DefaultSourceFactory();
return new FlowFileEvent(flowFile, session); protected static final SinkFactory SINK_FACTORY = new DefaultSinkFactory();
}
protected static void transferEvent(final Event event, ProcessSession session, protected static Event flowFileToEvent(FlowFile flowFile, ProcessSession session) {
Relationship relationship) { return new FlowFileEvent(flowFile, session);
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, event.getHeaders());
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(event.getBody());
}
});
session.getProvenanceReporter().create(flowFile);
session.transfer(flowFile, relationship);
}
protected static Validator createSourceValidator() {
return new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
String reason = null;
try {
FlumeSourceProcessor.SOURCE_FACTORY.create("NiFi Source", value);
} catch (Exception ex) {
reason = ex.getLocalizedMessage();
reason = Character.toLowerCase(reason.charAt(0)) + reason.substring(1);
}
return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
}
};
}
protected static Validator createSinkValidator() {
return new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
String reason = null;
try {
FlumeSinkProcessor.SINK_FACTORY.create("NiFi Sink", value);
} catch (Exception ex) {
reason = ex.getLocalizedMessage();
reason = Character.toLowerCase(reason.charAt(0)) + reason.substring(1);
}
return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
}
};
}
protected static Context getFlumeContext(String flumeConfig, String prefix) {
Properties flumeProperties = new Properties();
if (flumeConfig != null) {
try {
flumeProperties.load(new StringReader(flumeConfig));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
} }
Map<String, String> parameters = Maps.newHashMap();
for (String property : flumeProperties.stringPropertyNames()) { protected static void transferEvent(final Event event, ProcessSession session,
parameters.put(property, flumeProperties.getProperty(property)); Relationship relationship) {
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, event.getHeaders());
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(event.getBody());
}
});
session.getProvenanceReporter()
.create(flowFile);
session.transfer(flowFile, relationship);
} }
return new Context(new Context(parameters).getSubProperties(prefix));
}
protected static Context getFlumeSourceContext(String flumeConfig, protected static Validator createSourceValidator() {
String agentName, String sourceName) { return new Validator() {
return getFlumeContext(flumeConfig, agentName + ".sources." + sourceName + "."); @Override
} public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
String reason = null;
try {
FlumeSourceProcessor.SOURCE_FACTORY.create("NiFi Source", value);
} catch (Exception ex) {
reason = ex.getLocalizedMessage();
reason = Character.toLowerCase(reason.charAt(0)) + reason.substring(1);
}
return new ValidationResult.Builder().subject(subject)
.input(value)
.explanation(reason)
.valid(reason == null)
.build();
}
};
}
protected static Context getFlumeSinkContext(String flumeConfig, protected static Validator createSinkValidator() {
String agentName, String sinkName) { return new Validator() {
return getFlumeContext(flumeConfig, agentName + ".sinks." + sinkName + "."); @Override
} public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
String reason = null;
try {
FlumeSinkProcessor.SINK_FACTORY.create("NiFi Sink", value);
} catch (Exception ex) {
reason = ex.getLocalizedMessage();
reason = Character.toLowerCase(reason.charAt(0)) + reason.substring(1);
}
return new ValidationResult.Builder().subject(subject)
.input(value)
.explanation(reason)
.valid(reason == null)
.build();
}
};
}
protected static Context getFlumeContext(String flumeConfig, String prefix) {
Properties flumeProperties = new Properties();
if (flumeConfig != null) {
try {
flumeProperties.load(new StringReader(flumeConfig));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
Map<String, String> parameters = Maps.newHashMap();
for (String property : flumeProperties.stringPropertyNames()) {
parameters.put(property, flumeProperties.getProperty(property));
}
return new Context(new Context(parameters).getSubProperties(prefix));
}
protected static Context getFlumeSourceContext(String flumeConfig,
String agentName, String sourceName) {
return getFlumeContext(flumeConfig, agentName + ".sources." + sourceName + ".");
}
protected static Context getFlumeSinkContext(String flumeConfig,
String agentName, String sinkName) {
return getFlumeContext(flumeConfig, agentName + ".sinks." + sinkName + ".");
}
/*
* Borrowed from AbstractProcessor. The FlumeSourceProcessor needs to implement this directly
* to handle event driven sources, but it's marked final in AbstractProcessor.
*/
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
final ProcessSession session = sessionFactory.createSession();
try {
onTrigger(context, session);
session.commit();
} catch (final Throwable t) {
getLogger()
.error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
session.rollback(true);
throw t;
}
}
public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
} }

View File

@ -19,16 +19,12 @@ package org.apache.nifi.processors.flume;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.flume.Context; import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException; import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink; import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables; import org.apache.flume.conf.Configurables;
import org.apache.jasper.compiler.JspUtil;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -36,7 +32,6 @@ import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessorInitializationContext;
@ -44,18 +39,14 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SchedulingContext; import org.apache.nifi.processor.SchedulingContext;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.flume.util.FlowFileEvent;
/** /**
* This processor runs a Flume sink * This processor runs a Flume sink
*/ */
@Tags({"flume", "hadoop", "get", "sink"}) @Tags({"flume", "hadoop", "get", "sink"})
@CapabilityDescription("Generate FlowFile data from a Flume sink") @CapabilityDescription("Write FlowFile data to a Flume sink")
public class FlumeSinkProcessor extends AbstractFlumeProcessor { public class FlumeSinkProcessor extends AbstractFlumeProcessor {
private Sink sink;
private MemoryChannel channel;
public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder() public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
.name("Sink Type") .name("Sink Type")
.description("The fully-qualified name of the Sink class") .description("The fully-qualified name of the Sink class")
@ -83,24 +74,19 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor {
.defaultValue("") .defaultValue("")
.addValidator(Validator.VALID) .addValidator(Validator.VALID)
.build(); .build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The number of FlowFiles to process in a single batch")
.required(true)
.defaultValue("100")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();
public static final Relationship SUCCESS = new Relationship.Builder().name("success").build(); public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
public static final Relationship FAILURE = new Relationship.Builder().name("failure").build(); public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
private List<PropertyDescriptor> descriptors; private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships; private Set<Relationship> relationships;
private int batchSize;
private volatile Sink sink;
private volatile NifiSinkSessionChannel channel;
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG, BATCH_SIZE); this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
this.relationships = ImmutableSet.of(SUCCESS, FAILURE); this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
} }
@ -116,14 +102,9 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor {
@OnScheduled @OnScheduled
public void onScheduled(final SchedulingContext context) { public void onScheduled(final SchedulingContext context) {
batchSize = context.getProperty(BATCH_SIZE).asInteger();
try { try {
channel = new MemoryChannel(); channel = new NifiSinkSessionChannel(SUCCESS, FAILURE);
Context memoryChannelContext = new Context(); Configurables.configure(channel, new Context());
memoryChannelContext.put("capacity", String.valueOf(batchSize*10));
memoryChannelContext.put("transactionCapacity", String.valueOf(batchSize*10));
Configurables.configure(channel, memoryChannelContext);
channel.start(); channel.start();
sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(), sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
@ -152,42 +133,14 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor {
@Override @Override
public void onTrigger(final ProcessContext context, public void onTrigger(final ProcessContext context,
final ProcessSession session) throws ProcessException { final ProcessSession session) throws ProcessException {
List<FlowFile> flowFiles = Lists.newArrayListWithExpectedSize(batchSize);
for (int i = 0; i < batchSize; i++) {
FlowFile flowFile = session.get();
if (flowFile == null) {
break;
}
flowFiles.add(flowFile); channel.setSession(session);
}
Transaction transaction = channel.getTransaction();
try { try {
transaction.begin(); if (sink.process() == Sink.Status.BACKOFF) {
for (FlowFile flowFile : flowFiles) { context.yield();
channel.put(new FlowFileEvent(flowFile, session));
}
transaction.commit();
} catch (Throwable th) {
transaction.rollback();
throw Throwables.propagate(th);
} finally {
transaction.close();
}
try {
Sink.Status status;
do {
status = sink.process();
} while(status == Sink.Status.READY);
for (FlowFile flowFile : flowFiles) {
session.transfer(flowFile, SUCCESS);
} }
} catch (EventDeliveryException ex) { } catch (EventDeliveryException ex) {
for (FlowFile flowFile : flowFiles) { throw new ProcessException("Flume event delivery failed", ex);
session.transfer(flowFile, FAILURE);
}
} }
} }
} }

View File

@ -21,27 +21,25 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.flume.Context; import java.util.concurrent.atomic.AtomicReference;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException; import org.apache.flume.EventDeliveryException;
import org.apache.flume.EventDrivenSource; import org.apache.flume.EventDrivenSource;
import org.apache.flume.PollableSource; import org.apache.flume.PollableSource;
import org.apache.flume.Source; import org.apache.flume.Source;
import org.apache.flume.SourceRunner;
import org.apache.flume.Transaction;
import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables; import org.apache.flume.conf.Configurables;
import org.apache.flume.source.EventDrivenSourceRunner; import org.apache.flume.source.EventDrivenSourceRunner;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SchedulingContext; import org.apache.nifi.processor.SchedulingContext;
@ -55,43 +53,48 @@ import org.apache.nifi.processor.util.StandardValidators;
@CapabilityDescription("Generate FlowFile data from a Flume source") @CapabilityDescription("Generate FlowFile data from a Flume source")
public class FlumeSourceProcessor extends AbstractFlumeProcessor { public class FlumeSourceProcessor extends AbstractFlumeProcessor {
private Source source;
private SourceRunner runner;
private MemoryChannel channel;
public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder() public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
.name("Source Type") .name("Source Type")
.description("The fully-qualified name of the Source class") .description("The fully-qualified name of the Source class")
.required(true) .required(true)
.addValidator(createSourceValidator()) .addValidator(createSourceValidator())
.build(); .build();
public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
.name("Agent Name") .name("Agent Name")
.description("The name of the agent used in the Flume source configuration") .description("The name of the agent used in the Flume source configuration")
.required(true) .required(true)
.defaultValue("tier1") .defaultValue("tier1")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
.name("Source Name") .name("Source Name")
.description("The name of the source used in the Flume source configuration") .description("The name of the source used in the Flume source configuration")
.required(true) .required(true)
.defaultValue("src-1") .defaultValue("src-1")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder() public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
.name("Flume Configuration") .name("Flume Configuration")
.description("The Flume configuration for the source copied from the flume.properties file") .description("The Flume configuration for the source copied from the flume.properties file")
.required(true) .required(true)
.defaultValue("") .defaultValue("")
.addValidator(Validator.VALID) .addValidator(Validator.VALID)
.build(); .build();
public static final Relationship SUCCESS = new Relationship.Builder().name("success").build(); public static final Relationship SUCCESS = new Relationship.Builder().name("success")
.build();
private List<PropertyDescriptor> descriptors; private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships; private Set<Relationship> relationships;
private volatile Source source;
private final NifiSessionChannel pollableSourceChannel = new NifiSessionChannel(SUCCESS);
private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference<>(null);
private final AtomicReference<EventDrivenSourceRunner> runnerRef = new AtomicReference<>(null);
private final AtomicReference<NifiSessionFactoryChannel> eventDrivenSourceChannelRef = new AtomicReference<>(null);
private final AtomicReference<Boolean> stopping = new AtomicReference<>(false);
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG); this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
@ -111,81 +114,90 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor {
@OnScheduled @OnScheduled
public void onScheduled(final SchedulingContext context) { public void onScheduled(final SchedulingContext context) {
try { try {
stopping.set(false);
source = SOURCE_FACTORY.create( source = SOURCE_FACTORY.create(
context.getProperty(SOURCE_NAME).getValue(), context.getProperty(SOURCE_NAME)
context.getProperty(SOURCE_TYPE).getValue()); .getValue(),
context.getProperty(SOURCE_TYPE)
.getValue());
String flumeConfig = context.getProperty(FLUME_CONFIG).getValue(); String flumeConfig = context.getProperty(FLUME_CONFIG)
String agentName = context.getProperty(AGENT_NAME).getValue(); .getValue();
String sourceName = context.getProperty(SOURCE_NAME).getValue(); String agentName = context.getProperty(AGENT_NAME)
.getValue();
String sourceName = context.getProperty(SOURCE_NAME)
.getValue();
Configurables.configure(source, Configurables.configure(source,
getFlumeSourceContext(flumeConfig, agentName, sourceName)); getFlumeSourceContext(flumeConfig, agentName, sourceName));
if (source instanceof EventDrivenSource) { if (source instanceof PollableSource) {
runner = new EventDrivenSourceRunner(); source.setChannelProcessor(new ChannelProcessor(
channel = new MemoryChannel(); new NifiChannelSelector(pollableSourceChannel)));
Configurables.configure(channel, new Context()); source.start();
channel.start();
source.setChannelProcessor(new ChannelProcessor(new NifiChannelSelector(channel)));
runner.setSource(source);
runner.start();
} }
} catch (Throwable th) { } catch (Throwable th) {
getLogger().error("Error creating source", th); getLogger()
.error("Error creating source", th);
throw Throwables.propagate(th); throw Throwables.propagate(th);
} }
} }
@OnUnscheduled @OnUnscheduled
public void unScheduled() { public void unScheduled() {
if (runner != null) { stopping.set(true);
runner.stop(); if (source instanceof PollableSource) {
source.stop();
} else {
EventDrivenSourceRunner runner = runnerRef.get();
if (runner != null) {
runner.stop();
runnerRef.compareAndSet(runner, null);
}
NifiSessionFactoryChannel eventDrivenSourceChannel = eventDrivenSourceChannelRef.get();
if (eventDrivenSourceChannel != null) {
eventDrivenSourceChannel.stop();
eventDrivenSourceChannelRef.compareAndSet(eventDrivenSourceChannel, null);
}
} }
if (channel != null) { }
channel.stop();
@OnStopped
public void stopped() {
sessionFactoryRef.set(null);
}
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
if (source instanceof PollableSource) {
super.onTrigger(context, sessionFactory);
} else if (source instanceof EventDrivenSource) {
ProcessSessionFactory old = sessionFactoryRef.getAndSet(sessionFactory);
if (old == null) {
runnerRef.set(new EventDrivenSourceRunner());
eventDrivenSourceChannelRef.set(new NifiSessionFactoryChannel(sessionFactoryRef.get(), SUCCESS));
eventDrivenSourceChannelRef.get()
.start();
source.setChannelProcessor(new ChannelProcessor(new NifiChannelSelector(
eventDrivenSourceChannelRef.get())));
runnerRef.get()
.setSource(source);
runnerRef.get()
.start();
}
} }
} }
@Override @Override
public void onTrigger(final ProcessContext context, public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ProcessSession session) throws ProcessException { if (source instanceof PollableSource) {
if (source instanceof EventDrivenSource) { PollableSource pollableSource = (PollableSource) source;
onEventDrivenTrigger(context, session); try {
} else if (source instanceof PollableSource) { pollableSourceChannel.setSession(session);
onPollableTrigger((PollableSource) source, context, session); pollableSource.process();
} } catch (EventDeliveryException ex) {
} throw new ProcessException("Error processing pollable source", ex);
public void onPollableTrigger(final PollableSource pollableSource,
final ProcessContext context, final ProcessSession session)
throws ProcessException {
try {
pollableSource.setChannelProcessor(new ChannelProcessor(
new NifiChannelSelector(new NifiChannel(session, SUCCESS))));
pollableSource.start();
pollableSource.process();
pollableSource.stop();
} catch (EventDeliveryException ex) {
throw new ProcessException("Error processing pollable source", ex);
}
}
public void onEventDrivenTrigger(final ProcessContext context, final ProcessSession session) {
Transaction transaction = channel.getTransaction();
transaction.begin();
try {
Event event = channel.take();
if (event != null) {
transferEvent(event, session, SUCCESS);
} }
transaction.commit();
} catch (Throwable th) {
transaction.rollback();
throw Throwables.propagate(th);
} finally {
transaction.close();
} }
} }
} }

View File

@ -22,24 +22,26 @@ import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
public class NifiSessionChannel extends BasicChannelSemantics {
public class NifiChannel extends BasicChannelSemantics { private ProcessSession session;
private final ProcessSession session; private final Relationship relationship;
private final Relationship relationship;
public NifiChannel(ProcessSession session, Relationship relationship) { public NifiSessionChannel(Relationship relationship) {
this.session = session; this.relationship = relationship;
this.relationship = relationship; }
}
@Override public void setSession(ProcessSession session) {
protected BasicTransactionSemantics createTransaction() { this.session = session;
return new NifiTransaction(session, relationship); }
}
@Override @Override
public void configure(Context context) { protected BasicTransactionSemantics createTransaction() {
} return new NifiTransaction(session, relationship);
}
@Override
public void configure(Context context) {
}
} }

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.flume;
import org.apache.flume.ChannelFullException;
import org.apache.flume.Context;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
public class NifiSessionFactoryChannel extends BasicChannelSemantics {
private final ProcessSessionFactory sessionFactory;
private final Relationship relationship;
public NifiSessionFactoryChannel(ProcessSessionFactory sessionFactory, Relationship relationship) {
this.sessionFactory = sessionFactory;
this.relationship = relationship;
}
@Override
protected BasicTransactionSemantics createTransaction() {
LifecycleState lifecycleState = getLifecycleState();
if (lifecycleState == LifecycleState.STOP) {
throw new ChannelFullException("Can't write to a stopped channel");
//return null;
}
return new NifiTransaction(sessionFactory.createSession(), relationship);
}
@Override
public void configure(Context context) {
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.flume;
import org.apache.flume.Context;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
public class NifiSinkSessionChannel extends BasicChannelSemantics {
private ProcessSession session;
private final Relationship success;
private final Relationship failure;
public NifiSinkSessionChannel(Relationship success, Relationship failure) {
this.success = success;
this.failure = failure;
}
public void setSession(ProcessSession session) {
this.session = session;
}
@Override
protected BasicTransactionSemantics createTransaction() {
return new NifiSinkTransaction(session, success, failure);
}
@Override
public void configure(Context context) {
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.flume;
import java.util.ArrayList;
import java.util.List;
import org.apache.flume.Event;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.flume.util.FlowFileEvent;
class NifiSinkTransaction extends BasicTransactionSemantics {
private final ProcessSession session;
private final Relationship success;
private final Relationship failure;
private final List<FlowFile> flowFiles;
public NifiSinkTransaction(ProcessSession session, Relationship success, Relationship failure) {
this.session = session;
this.success = success;
this.failure = failure;
this.flowFiles = new ArrayList<>();
}
@Override
protected void doPut(Event event) throws InterruptedException {
AbstractFlumeProcessor.transferEvent(event, session, success);
}
@Override
protected Event doTake() throws InterruptedException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return null;
}
flowFiles.add(flowFile);
return new FlowFileEvent(flowFile, session);
}
@Override
protected void doCommit() throws InterruptedException {
session.transfer(flowFiles, success);
session.commit();
}
@Override
protected void doRollback() throws InterruptedException {
session.transfer(flowFiles, failure);
session.commit();
}
}

View File

@ -21,35 +21,34 @@ import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
class NifiTransaction extends BasicTransactionSemantics { class NifiTransaction extends BasicTransactionSemantics {
private final ProcessSession session;
private final Relationship relationship;
public NifiTransaction(ProcessSession session, Relationship relationship) { private final ProcessSession session;
this.session = session; private final Relationship relationship;
this.relationship = relationship;
}
@Override public NifiTransaction(ProcessSession session, Relationship relationship) {
protected void doPut(Event event) throws InterruptedException { this.session = session;
AbstractFlumeProcessor.transferEvent(event, session, relationship); this.relationship = relationship;
} }
@Override @Override
protected Event doTake() throws InterruptedException { protected void doPut(Event event) throws InterruptedException {
throw new UnsupportedOperationException("Only put supported"); AbstractFlumeProcessor.transferEvent(event, session, relationship);
} }
@Override @Override
protected void doCommit() throws InterruptedException { protected Event doTake() throws InterruptedException {
session.commit(); throw new UnsupportedOperationException("Only put supported");
} }
@Override @Override
protected void doRollback() throws InterruptedException { protected void doCommit() throws InterruptedException {
session.rollback(); session.commit();
} }
@Override
protected void doRollback() throws InterruptedException {
session.rollback();
}
} }

View File

@ -126,13 +126,12 @@ public class FlumeSinkProcessorTest {
public void testBatchSize() throws IOException { public void testBatchSize() throws IOException {
TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class); TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName()); runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
runner.setProperty(FlumeSinkProcessor.BATCH_SIZE, "1000");
runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG, runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
"tier1.sinks.sink-1.batchSize = 1000\n"); "tier1.sinks.sink-1.batchSize = 1000\n");
for (int i = 0; i < 100000; i++) { for (int i = 0; i < 100000; i++) {
runner.enqueue(String.valueOf(i).getBytes()); runner.enqueue(String.valueOf(i).getBytes());
} }
runner.run(); runner.run(100);
} }
@Test @Test

View File

@ -127,11 +127,8 @@ public class FlumeSourceProcessorTest {
runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG, runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
"tier1.sources.src-1.spoolDir = " + spoolDirectory.getAbsolutePath()); "tier1.sources.src-1.spoolDir = " + spoolDirectory.getAbsolutePath());
runner.run(); runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS); // No data will be transfered because of how quickly the test runner
Assert.assertEquals(1, flowFiles.size()); // starts shutting down
for (MockFlowFile flowFile : flowFiles) { runner.assertTransferCount(FlumeSourceProcessor.SUCCESS, 0);
Assert.assertEquals(8, flowFile.getSize());
flowFile.assertContentEquals("record 1");
}
} }
} }

View File

@ -18,3 +18,5 @@ log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x \u2013 %m%n log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x \u2013 %m%n
log4j.logger.org.apache.flume = DEBUG

View File

@ -18,3 +18,4 @@ org.slf4j.simpleLogger.showDateTime=true
org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss.SSS org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss.SSS
org.slf4j.simpleLogger.levelInBrackets=true org.slf4j.simpleLogger.levelInBrackets=true
org.slf4j.simpleLogger.log.org.apache.nifi.processors.flume=debug org.slf4j.simpleLogger.log.org.apache.nifi.processors.flume=debug
org.slf4j.simpleLogger.log.org.apache.flume=debug

View File

@ -17,10 +17,10 @@
<parent> <parent>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId> <artifactId>nifi-nar-bundles</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version> <version>0.2.0-incubating-SNAPSHOT</version>
</parent> </parent>
<artifactId>nifi-flume-bundle</artifactId> <artifactId>nifi-flume-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version> <version>0.2.0-incubating-SNAPSHOT</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<description>A bundle of processors that run Flume sources/sinks</description> <description>A bundle of processors that run Flume sources/sinks</description>
<modules> <modules>