Fix poms, versions, add batching to sink processor

* Fix pom issues caused by the rebase.
* Update the Flume bundle's version to 0.1.0
* Add support for batching to the sink processor

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
Joey Echeverria 2015-04-07 17:18:45 -07:00 committed by Matt Gilman
parent cf29029a4d
commit 3b9e482466
8 changed files with 61 additions and 20 deletions

View File

@ -165,6 +165,8 @@ language governing permissions and limitations under the License. -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kite-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flume-nar</artifactId>

View File

@ -17,10 +17,10 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flume-bundle</artifactId>
<version>0.0.1-incubating-SNAPSHOT</version>
<version>0.1.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-flume-nar</artifactId>
<version>0.0.1-incubating-SNAPSHOT</version>
<version>0.1.0-incubating-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>

View File

@ -17,7 +17,7 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flume-bundle</artifactId>
<version>0.0.1-incubating-SNAPSHOT</version>
<version>0.1.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-flume-processors</artifactId>
<packaging>jar</packaging>

View File

@ -20,20 +20,15 @@ import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.SinkFactory;
import org.apache.flume.Source;
import org.apache.flume.SourceFactory;
import org.apache.flume.sink.DefaultSinkFactory;
import org.apache.flume.source.DefaultSourceFactory;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
@ -42,7 +37,6 @@ import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback;
import static org.apache.nifi.processors.flume.FlumeSourceProcessor.FLUME_CONFIG;
import org.apache.nifi.processors.flume.util.FlowFileEvent;
/**

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.flume;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Set;
import org.apache.flume.Context;
@ -27,6 +28,7 @@ import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.jasper.compiler.JspUtil;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -81,16 +83,24 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor {
.defaultValue("")
.addValidator(Validator.VALID)
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The number of FlowFiles to process in a single batch")
.required(true)
.defaultValue("100")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();
public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
private int batchSize;
@Override
protected void init(final ProcessorInitializationContext context) {
this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG, BATCH_SIZE);
this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
}
@ -106,9 +116,14 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor {
@OnScheduled
public void onScheduled(final SchedulingContext context) {
batchSize = context.getProperty(BATCH_SIZE).asInteger();
try {
channel = new MemoryChannel();
Configurables.configure(channel, new Context());
Context memoryChannelContext = new Context();
memoryChannelContext.put("capacity", String.valueOf(batchSize*10));
memoryChannelContext.put("transactionCapacity", String.valueOf(batchSize*10));
Configurables.configure(channel, memoryChannelContext);
channel.start();
sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
@ -137,12 +152,22 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor {
@Override
public void onTrigger(final ProcessContext context,
final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
List<FlowFile> flowFiles = Lists.newArrayListWithExpectedSize(batchSize);
for (int i = 0; i < batchSize; i++) {
FlowFile flowFile = session.get();
if (flowFile == null) {
break;
}
flowFiles.add(flowFile);
}
Transaction transaction = channel.getTransaction();
try {
transaction.begin();
channel.put(new FlowFileEvent(flowFile, session));
for (FlowFile flowFile : flowFiles) {
channel.put(new FlowFileEvent(flowFile, session));
}
transaction.commit();
} catch (Throwable th) {
transaction.rollback();
@ -152,10 +177,17 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor {
}
try {
sink.process();
session.transfer(flowFile, SUCCESS);
Sink.Status status;
do {
status = sink.process();
} while(status == Sink.Status.READY);
for (FlowFile flowFile : flowFiles) {
session.transfer(flowFile, SUCCESS);
}
} catch (EventDeliveryException ex) {
session.transfer(flowFile, FAILURE);
for (FlowFile flowFile : flowFiles) {
session.transfer(flowFile, FAILURE);
}
}
}
}

View File

@ -117,6 +117,19 @@ public class FlumeSinkProcessorTest {
fis.close();
}
@Test
public void testBatchSize() throws IOException {
TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
runner.setProperty(FlumeSinkProcessor.BATCH_SIZE, "1000");
runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
"tier1.sinks.sink-1.batchSize = 1000\n");
for (int i = 0; i < 100000; i++) {
runner.enqueue(String.valueOf(i).getBytes());
}
runner.run();
}
@Test
public void testHdfsSink() throws IOException {
File destDir = new File("target/hdfs");

View File

@ -17,10 +17,10 @@
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.0.1-incubating-SNAPSHOT</version>
<version>0.1.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-flume-bundle</artifactId>
<version>0.0.1-incubating-SNAPSHOT</version>
<version>0.1.0-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<description>A bundle of processors that run Flume sources/sinks</description>
<modules>
@ -32,7 +32,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flume-processors</artifactId>
<version>0.0.1-incubating-SNAPSHOT</version>
<version>0.1.0-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -808,7 +808,7 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flume-nar</artifactId>
<version>0.0.1-incubating-SNAPSHOT</version>
<version>0.1.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>