From 3b9e48246641e617cd3fef987ce6facd283f6f3e Mon Sep 17 00:00:00 2001 From: Joey Echeverria Date: Tue, 7 Apr 2015 17:18:45 -0700 Subject: [PATCH] Fix poms, versions, add batching to sink processor * Fix pom issues caused by the rebase. * Update the Flume bundle's version to 0.1.0 * Add support for batching to the sink processor Signed-off-by: Matt Gilman --- nifi/nifi-assembly/pom.xml | 2 + .../nifi-flume-bundle/nifi-flume-nar/pom.xml | 4 +- .../nifi-flume-processors/pom.xml | 2 +- .../flume/AbstractFlumeProcessor.java | 6 --- .../processors/flume/FlumeSinkProcessor.java | 46 ++++++++++++++++--- .../flume/FlumeSinkProcessorTest.java | 13 ++++++ .../nifi-flume-bundle/pom.xml | 6 +-- nifi/pom.xml | 2 +- 8 files changed, 61 insertions(+), 20 deletions(-) diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml index 4f4879fefe..0293c70013 100644 --- a/nifi/nifi-assembly/pom.xml +++ b/nifi/nifi-assembly/pom.xml @@ -165,6 +165,8 @@ language governing permissions and limitations under the License. --> org.apache.nifi nifi-kite-nar + nar + org.apache.nifi nifi-flume-nar diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml index dff440eb7f..36a5170e5e 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml @@ -17,10 +17,10 @@ org.apache.nifi nifi-flume-bundle - 0.0.1-incubating-SNAPSHOT + 0.1.0-incubating-SNAPSHOT nifi-flume-nar - 0.0.1-incubating-SNAPSHOT + 0.1.0-incubating-SNAPSHOT nar diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml index 54636caf76..bd26a9916c 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml @@ -17,7 +17,7 @@ org.apache.nifi nifi-flume-bundle - 0.0.1-incubating-SNAPSHOT + 0.1.0-incubating-SNAPSHOT nifi-flume-processors jar diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java index 5c608d5ecd..a8310008dd 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java @@ -20,20 +20,15 @@ import com.google.common.collect.Maps; import java.io.IOException; import java.io.OutputStream; import java.io.StringReader; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.SinkFactory; -import org.apache.flume.Source; import org.apache.flume.SourceFactory; import org.apache.flume.sink.DefaultSinkFactory; import org.apache.flume.source.DefaultSourceFactory; -import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; @@ -42,7 +37,6 @@ import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.io.OutputStreamCallback; -import static org.apache.nifi.processors.flume.FlumeSourceProcessor.FLUME_CONFIG; import org.apache.nifi.processors.flume.util.FlowFileEvent; /** diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java index fc97ae8685..0ffd4f1c27 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.flume; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import java.util.List; import java.util.Set; import org.apache.flume.Context; @@ -27,6 +28,7 @@ import org.apache.flume.Sink; import org.apache.flume.Transaction; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; +import org.apache.jasper.compiler.JspUtil; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -81,16 +83,24 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor { .defaultValue("") .addValidator(Validator.VALID) .build(); + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The number of FlowFiles to process in a single batch") + .required(true) + .defaultValue("100") + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); public static final Relationship SUCCESS = new Relationship.Builder().name("success").build(); public static final Relationship FAILURE = new Relationship.Builder().name("failure").build(); private List descriptors; private Set relationships; + private int batchSize; @Override protected void init(final ProcessorInitializationContext context) { - this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG); + this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG, BATCH_SIZE); this.relationships = ImmutableSet.of(SUCCESS, FAILURE); } @@ -106,9 +116,14 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor { @OnScheduled public void onScheduled(final SchedulingContext context) { + batchSize = context.getProperty(BATCH_SIZE).asInteger(); + try { channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); + Context memoryChannelContext = new Context(); + memoryChannelContext.put("capacity", String.valueOf(batchSize*10)); + memoryChannelContext.put("transactionCapacity", String.valueOf(batchSize*10)); + Configurables.configure(channel, memoryChannelContext); channel.start(); sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(), @@ -137,12 +152,22 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); + List flowFiles = Lists.newArrayListWithExpectedSize(batchSize); + for (int i = 0; i < batchSize; i++) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + break; + } + + flowFiles.add(flowFile); + } Transaction transaction = channel.getTransaction(); try { transaction.begin(); - channel.put(new FlowFileEvent(flowFile, session)); + for (FlowFile flowFile : flowFiles) { + channel.put(new FlowFileEvent(flowFile, session)); + } transaction.commit(); } catch (Throwable th) { transaction.rollback(); @@ -152,10 +177,17 @@ public class FlumeSinkProcessor extends AbstractFlumeProcessor { } try { - sink.process(); - session.transfer(flowFile, SUCCESS); + Sink.Status status; + do { + status = sink.process(); + } while(status == Sink.Status.READY); + for (FlowFile flowFile : flowFiles) { + session.transfer(flowFile, SUCCESS); + } } catch (EventDeliveryException ex) { - session.transfer(flowFile, FAILURE); + for (FlowFile flowFile : flowFiles) { + session.transfer(flowFile, FAILURE); + } } } } diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java index 8d40cb6314..d22514f1f3 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java @@ -116,6 +116,19 @@ public class FlumeSinkProcessorTest { runner.run(); fis.close(); } + + @Test + public void testBatchSize() throws IOException { + TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class); + runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName()); + runner.setProperty(FlumeSinkProcessor.BATCH_SIZE, "1000"); + runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG, + "tier1.sinks.sink-1.batchSize = 1000\n"); + for (int i = 0; i < 100000; i++) { + runner.enqueue(String.valueOf(i).getBytes()); + } + runner.run(); + } @Test public void testHdfsSink() throws IOException { diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml index dc9ec69deb..50b0fde3a9 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml @@ -17,10 +17,10 @@ org.apache.nifi nifi-nar-bundles - 0.0.1-incubating-SNAPSHOT + 0.1.0-incubating-SNAPSHOT nifi-flume-bundle - 0.0.1-incubating-SNAPSHOT + 0.1.0-incubating-SNAPSHOT pom A bundle of processors that run Flume sources/sinks @@ -32,7 +32,7 @@ org.apache.nifi nifi-flume-processors - 0.0.1-incubating-SNAPSHOT + 0.1.0-incubating-SNAPSHOT diff --git a/nifi/pom.xml b/nifi/pom.xml index 422e1aac60..682a426aa5 100644 --- a/nifi/pom.xml +++ b/nifi/pom.xml @@ -808,7 +808,7 @@ org.apache.nifi nifi-flume-nar - 0.0.1-incubating-SNAPSHOT + 0.1.0-incubating-SNAPSHOT nar