Added processors that can run Flume sources and Flume sinks.

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
This commit is contained in:
Joey Echeverria 2015-01-28 16:30:49 -08:00 committed by Matt Gilman
parent 483b3dddfb
commit b251ab4425
19 changed files with 1330 additions and 0 deletions

View File

@ -0,0 +1,31 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flume-bundle</artifactId>
<version>0.0.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-flume-nar</artifactId>
<version>0.0.1-incubating-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flume-processors</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,126 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flume-bundle</artifactId>
<version>0.0.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-flume-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flowfile-packager</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.5.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Flume Sources -->
<dependency>
<groupId>org.apache.flume.flume-ng-sources</groupId>
<artifactId>flume-twitter-source</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sources</groupId>
<artifactId>flume-jms-source</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sources</groupId>
<artifactId>flume-scribe-source</artifactId>
<version>1.5.2</version>
</dependency>
<!-- Flume Sinks -->
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-hdfs-sink</artifactId>
<version>1.5.2</version>
</dependency>
<!-- HDFS sink dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-irc-sink</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-ng-elasticsearch-sink</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-ng-hbase-sink</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-ng-morphline-solr-sink</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.flume;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.SinkFactory;
import org.apache.flume.Source;
import org.apache.flume.SourceFactory;
import org.apache.flume.sink.DefaultSinkFactory;
import org.apache.flume.source.DefaultSourceFactory;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback;
import static org.apache.nifi.processors.flume.FlumeSourceProcessor.FLUME_CONFIG;
import org.apache.nifi.processors.flume.util.FlowFileEvent;
/**
* This is a base class that is helpful when building processors interacting
* with Flume.
*/
public abstract class AbstractFlumeProcessor extends AbstractProcessor {
protected static final SourceFactory SOURCE_FACTORY = new DefaultSourceFactory();
protected static final SinkFactory SINK_FACTORY = new DefaultSinkFactory();
protected static Event flowFileToEvent(FlowFile flowFile, ProcessSession session) {
return new FlowFileEvent(flowFile, session);
}
protected static void transferEvent(final Event event, ProcessSession session,
Relationship relationship) {
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, event.getHeaders());
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(event.getBody());
}
});
session.getProvenanceReporter().create(flowFile);
session.transfer(flowFile, relationship);
}
protected static Validator createSourceValidator() {
return new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
String reason = null;
try {
FlumeSourceProcessor.SOURCE_FACTORY.create("NiFi Source", value);
} catch (Exception ex) {
reason = ex.getLocalizedMessage();
reason = Character.toLowerCase(reason.charAt(0)) + reason.substring(1);
}
return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
}
};
}
protected static Validator createSinkValidator() {
return new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
String reason = null;
try {
FlumeSinkProcessor.SINK_FACTORY.create("NiFi Sink", value);
} catch (Exception ex) {
reason = ex.getLocalizedMessage();
reason = Character.toLowerCase(reason.charAt(0)) + reason.substring(1);
}
return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
}
};
}
protected static Context getFlumeContext(String flumeConfig, String prefix) {
Properties flumeProperties = new Properties();
if (flumeConfig != null) {
try {
flumeProperties.load(new StringReader(flumeConfig));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
Map<String, String> parameters = Maps.newHashMap();
for (String property : flumeProperties.stringPropertyNames()) {
parameters.put(property, flumeProperties.getProperty(property));
}
return new Context(new Context(parameters).getSubProperties(prefix));
}
protected static Context getFlumeSourceContext(String flumeConfig,
String agentName, String sourceName) {
return getFlumeContext(flumeConfig, agentName + ".sources." + sourceName + ".");
}
protected static Context getFlumeSinkContext(String flumeConfig,
String agentName, String sinkName) {
return getFlumeContext(flumeConfig, agentName + ".sinks." + sinkName + ".");
}
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.flume;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Set;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SchedulingContext;
import org.apache.nifi.processor.annotation.CapabilityDescription;
import org.apache.nifi.processor.annotation.OnScheduled;
import org.apache.nifi.processor.annotation.OnUnscheduled;
import org.apache.nifi.processor.annotation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.flume.util.FlowFileEvent;
/**
* This processor runs a Flume sink
*/
@Tags({"flume", "hadoop", "get", "sink" })
@CapabilityDescription("Generate FlowFile data from a Flume sink")
public class FlumeSinkProcessor extends AbstractFlumeProcessor {
private Sink sink;
private MemoryChannel channel;
public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder()
.name("Sink Type")
.description("The fully-qualified name of the Sink class")
.required(true)
.addValidator(createSinkValidator())
.build();
public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
.name("Agent Name")
.description("The name of the agent used in the Flume sink configuration")
.required(true)
.defaultValue("tier1")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
.name("Sink Name")
.description("The name of the sink used in the Flume sink configuration")
.required(true)
.defaultValue("sink-1")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
.name("Flume Configuration")
.description("The Flume configuration for the sink copied from the flume.properties file")
.required(true)
.defaultValue("")
.addValidator(Validator.VALID)
.build();
public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
public static final Relationship FAILURE = new Relationship.Builder().name("failure").build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
this.relationships = ImmutableSet.of();
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@OnScheduled
public void onScheduled(final SchedulingContext context) {
channel = new MemoryChannel();
Configurables.configure(channel, new Context());
channel.start();
sink = SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
context.getProperty(SINK_TYPE).getValue());
sink.setChannel(channel);
String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
String agentName = context.getProperty(AGENT_NAME).getValue();
String sinkName = context.getProperty(SOURCE_NAME).getValue();
Configurables.configure(sink,
getFlumeSinkContext(flumeConfig, agentName, sinkName) );
sink.start();
}
@OnUnscheduled
public void unScheduled() {
sink.stop();
channel.stop();
}
@Override
public void onTrigger(final ProcessContext context,
final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
Transaction transaction = channel.getTransaction();
try {
transaction.begin();
channel.put(new FlowFileEvent(flowFile, session));
transaction.commit();
} catch (Throwable th) {
transaction.rollback();
throw Throwables.propagate(th);
} finally {
transaction.close();
}
try {
sink.process();
session.transfer(flowFile, SUCCESS);
} catch (EventDeliveryException ex) {
session.transfer(flowFile, FAILURE);
}
}
}

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.flume;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Set;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.PollableSource;
import org.apache.flume.Source;
import org.apache.flume.SourceRunner;
import org.apache.flume.Transaction;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.source.EventDrivenSourceRunner;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SchedulingContext;
import org.apache.nifi.processor.annotation.CapabilityDescription;
import org.apache.nifi.processor.annotation.OnScheduled;
import org.apache.nifi.processor.annotation.OnUnscheduled;
import org.apache.nifi.processor.annotation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
/**
* This processor runs a Flume source
*/
@Tags({"flume", "hadoop", "get", "source" })
@CapabilityDescription("Generate FlowFile data from a Flume source")
public class FlumeSourceProcessor extends AbstractFlumeProcessor {
private Source source;
private SourceRunner runner;
private MemoryChannel channel;
public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
.name("Source Type")
.description("The fully-qualified name of the Source class")
.required(true)
.addValidator(createSourceValidator())
.build();
public static final PropertyDescriptor AGENT_NAME = new PropertyDescriptor.Builder()
.name("Agent Name")
.description("The name of the agent used in the Flume source configuration")
.required(true)
.defaultValue("tier1")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor SOURCE_NAME = new PropertyDescriptor.Builder()
.name("Source Name")
.description("The name of the source used in the Flume source configuration")
.required(true)
.defaultValue("src-1")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor FLUME_CONFIG = new PropertyDescriptor.Builder()
.name("Flume Configuration")
.description("The Flume configuration for the source copied from the flume.properties file")
.required(true)
.defaultValue("")
.addValidator(Validator.VALID)
.build();
public static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME, SOURCE_NAME, FLUME_CONFIG);
this.relationships = ImmutableSet.of(SUCCESS);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@OnScheduled
public void onScheduled(final SchedulingContext context) {
source = SOURCE_FACTORY.create(
context.getProperty(SOURCE_NAME).getValue(),
context.getProperty(SOURCE_TYPE).getValue());
String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
String agentName = context.getProperty(AGENT_NAME).getValue();
String sourceName = context.getProperty(SOURCE_NAME).getValue();
Configurables.configure(source,
getFlumeSourceContext(flumeConfig, agentName, sourceName) );
if (source instanceof EventDrivenSource) {
runner = new EventDrivenSourceRunner();
channel = new MemoryChannel();
Configurables.configure(channel, new Context());
channel.start();
source.setChannelProcessor(new ChannelProcessor(new NifiChannelSelector(channel)));
runner.setSource(source);
runner.start();
}
}
@OnUnscheduled
public void unScheduled() {
if (runner != null) {
runner.stop();
}
if (channel != null) {
channel.stop();
}
}
@Override
public void onTrigger(final ProcessContext context,
final ProcessSession session) throws ProcessException {
if (source instanceof EventDrivenSource) {
onEventDrivenTrigger(context, session);
} else if (source instanceof PollableSource) {
onPollableTrigger((PollableSource)source, context, session);
}
}
public void onPollableTrigger(final PollableSource pollableSource,
final ProcessContext context, final ProcessSession session)
throws ProcessException {
try {
pollableSource.setChannelProcessor(new ChannelProcessor(
new NifiChannelSelector(new NifiChannel(session, SUCCESS))));
pollableSource.start();
pollableSource.process();
pollableSource.stop();
} catch (EventDeliveryException ex) {
throw new ProcessException("Error processing pollable source", ex);
}
}
public void onEventDrivenTrigger(final ProcessContext context, final ProcessSession session) {
Transaction transaction = channel.getTransaction();
transaction.begin();
Event event = channel.take();
if (event != null) {
transferEvent(event, session, SUCCESS);
}
}
}

View File

@ -0,0 +1,31 @@
package org.apache.nifi.processors.flume;
import org.apache.flume.Context;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
public class NifiChannel extends BasicChannelSemantics {
private final ProcessSession session;
private final Relationship relationship;
public NifiChannel(ProcessSession session, Relationship relationship) {
this.session = session;
this.relationship = relationship;
}
@Override
protected BasicTransactionSemantics createTransaction() {
return new NifiTransaction(session, relationship);
}
@Override
public void configure(Context context) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
}

View File

@ -0,0 +1,55 @@
package org.apache.nifi.processors.flume;
import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.flume.Channel;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
import org.apache.flume.Event;
public class NifiChannelSelector implements ChannelSelector {
private String name;
private final List<Channel> requiredChannels;
private final List<Channel> optionalChannels;
public NifiChannelSelector(Channel channel) {
requiredChannels = ImmutableList.of(channel);
optionalChannels = ImmutableList.of();
}
@Override
public List<Channel> getRequiredChannels(Event event) {
return requiredChannels;
}
@Override
public List<Channel> getOptionalChannels(Event event) {
return optionalChannels;
}
@Override
public List<Channel> getAllChannels() {
return requiredChannels;
}
@Override
public void setChannels(List<Channel> channels) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
@Override
public String getName() {
return name;
}
@Override
public void setName(String name) {
this.name = name;
}
@Override
public void configure(Context context) {
}
}

View File

@ -0,0 +1,40 @@
package org.apache.nifi.processors.flume;
import org.apache.flume.Event;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
class NifiTransaction extends BasicTransactionSemantics {
private final ProcessSession session;
private final Relationship relationship;
public NifiTransaction(ProcessSession session, Relationship relationship) {
this.session = session;
this.relationship = relationship;
}
@Override
protected void doPut(Event event) throws InterruptedException {
AbstractFlumeProcessor.transferEvent(event, session, relationship);
}
@Override
protected Event doTake() throws InterruptedException {
throw new UnsupportedOperationException("Only put supported");
}
@Override
protected void doCommit() throws InterruptedException {
session.commit();
}
@Override
protected void doRollback() throws InterruptedException {
session.rollback();
}
}

View File

@ -0,0 +1,114 @@
package org.apache.nifi.processors.flume.util;
import com.google.common.collect.Maps;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Map;
import org.apache.flume.Event;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.io.InputStreamCallback;
import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.*;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.StreamUtils;
public class FlowFileEvent implements Event {
private final FlowFile flowFile;
private final ProcessSession session;
private final Map<String, String> headers;
private boolean headersLoaded;
private final Object bodyLock;
private byte[] body;
private boolean bodyLoaded;
public FlowFileEvent(FlowFile flowFile, ProcessSession session) {
this.flowFile = flowFile;
this.session = session;
headers = Maps.newHashMap();
bodyLock = new Object();
bodyLoaded = false;
}
@Override
public Map<String, String> getHeaders() {
if (!headersLoaded) {
synchronized (headers) {
if (headersLoaded) {
return headers;
}
headers.putAll(flowFile.getAttributes());
headers.put(ENTRY_DATE_HEADER, Long.toString(flowFile.getEntryDate()));
headers.put(ID_HEADER, Long.toString(flowFile.getId()));
headers.put(LAST_QUEUE_DATE_HEADER, Long.toString(flowFile.getLastQueueDate()));
int i = 0;
for (String lineageIdentifier : flowFile.getLineageIdentifiers()) {
headers.put(LINEAGE_IDENTIFIERS_HEADER + "." + i, lineageIdentifier);
i++;
}
headers.put(LINEAGE_START_DATE_HEADER, Long.toString(flowFile.getLineageStartDate()));
headers.put(SIZE_HEADER, Long.toString(flowFile.getSize()));
headersLoaded = true;
}
}
return headers;
}
@Override
public void setHeaders(Map<String, String> headers) {
synchronized (this.headers) {
this.headers.clear();
this.headers.putAll(headers);
headersLoaded = true;
}
}
@Override
public byte[] getBody() {
if (bodyLoaded) {
return body;
}
synchronized (bodyLock ) {
if (!bodyLoaded) {
if (flowFile.getSize() > Integer.MAX_VALUE) {
throw new RuntimeException("Can't get body of Event because the backing FlowFile is too large (" + flowFile.getSize() + " bytes)");
}
final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) flowFile.getSize());
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
try (BufferedInputStream input = new BufferedInputStream(in)) {
StreamUtils.copy(in, baos);
}
baos.close();
}
});
body = baos.toByteArray();
bodyLoaded = true;
}
}
return body;
}
@Override
public void setBody(byte[] body) {
synchronized (bodyLock) {
this.body = Arrays.copyOf(body, body.length);
bodyLoaded = true;
}
}
}

View File

@ -0,0 +1,25 @@
package org.apache.nifi.processors.flume.util;
public class FlowFileEventConstants {
// FlowFile#getEntryDate();
public static final String ENTRY_DATE_HEADER = "nifi.entry.date";
// FlowFile#getId();
public static final String ID_HEADER = "nifi.id";
// FlowFile#getLastQueueDate();
public static final String LAST_QUEUE_DATE_HEADER = "nifi.last.queue.date";
// FlowFile#getLineageIdentifiers();
public static final String LINEAGE_IDENTIFIERS_HEADER = "nifi.lineage.identifiers";
// FlowFile#getLineageStartDate();
public static final String LINEAGE_START_DATE_HEADER = "nifi.lineage.start.date";
// FlowFile#getSize();
public static final String SIZE_HEADER = "nifi.size";
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.flume.FlumeSourceProcessor
org.apache.nifi.processors.flume.FlumeSinkProcessor

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.flume;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AbstractFlumeTest {
private static Logger logger;
@BeforeClass
public static void setUpClass() {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.flume", "debug");
logger = LoggerFactory.getLogger(AbstractFlumeTest.class);
}
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.flume;
import java.io.File;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.FileInputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.commons.io.filefilter.HiddenFileFilter;
import org.apache.flume.sink.NullSink;
import org.apache.flume.source.AvroSource;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FlumeSinkProcessorTest {
private static final Logger logger =
LoggerFactory.getLogger(FlumeSinkProcessorTest.class);
@Test
public void testValidators() {
TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
Collection<ValidationResult> results;
ProcessContext pc;
results = new HashSet<>();
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
logger.error(vr.toString());
Assert.assertTrue(vr.toString().contains("is invalid because Sink Type is required"));
}
// non-existent class
results = new HashSet<>();
runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "invalid.class.name");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
logger.error(vr.toString());
Assert.assertTrue(vr.toString().contains("is invalid because unable to load sink"));
}
// class doesn't implement Sink
results = new HashSet<>();
runner.setProperty(FlumeSinkProcessor.SINK_TYPE, AvroSource.class.getName());
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
logger.error(vr.toString());
Assert.assertTrue(vr.toString().contains("is invalid because unable to create sink"));
}
results = new HashSet<>();
runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(0, results.size());
}
@Test
public void testNullSink() throws IOException {
TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt");
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
runner.enqueue(fis, attributes);
runner.run();
fis.close();
}
@Test
public void testHdfsSink() throws IOException {
File destDir = new File("target/hdfs");
if (destDir.exists()) {
FileUtils.deleteFilesInDir(destDir, null, logger);
} else {
destDir.mkdirs();
}
TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "hdfs");
runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
"tier1.sinks.sink-1.hdfs.path = " + destDir.toURI().toString() + "\n" +
"tier1.sinks.sink-1.hdfs.fileType = DataStream\n" +
"tier1.sinks.sink-1.hdfs.serializer = TEXT\n" +
"tier1.sinks.sink-1.serializer.appendNewline = false"
);
FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt");
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
runner.enqueue(fis, attributes);
runner.run();
fis.close();
File[] files = destDir.listFiles((FilenameFilter)HiddenFileFilter.VISIBLE);
assertEquals("Unexpected number of destination files.", 1, files.length);
File dst = files[0];
byte[] expectedMd5 = FileUtils.computeMd5Digest(new File("src/test/resources/testdata/records.txt"));
byte[] actualMd5 = FileUtils.computeMd5Digest(dst);
Assert.assertArrayEquals("Destination file doesn't match source data", expectedMd5, actualMd5);
}
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.flume;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import org.apache.flume.sink.NullSink;
import org.apache.flume.source.AvroSource;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FlumeSourceProcessorTest {
private static final Logger logger =
LoggerFactory.getLogger(FlumeSourceProcessorTest.class);
@Test
public void testValidators() {
TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class);
Collection<ValidationResult> results;
ProcessContext pc;
results = new HashSet<>();
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
logger.error(vr.toString());
Assert.assertTrue(vr.toString().contains("is invalid because Source Type is required"));
}
// non-existent class
results = new HashSet<>();
runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "invalid.class.name");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
logger.error(vr.toString());
Assert.assertTrue(vr.toString().contains("is invalid because unable to load source"));
}
// class doesn't implement Source
results = new HashSet<>();
runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, NullSink.class.getName());
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
logger.error(vr.toString());
Assert.assertTrue(vr.toString().contains("is invalid because unable to create source"));
}
results = new HashSet<>();
runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, AvroSource.class.getName());
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(0, results.size());
}
@Test
public void testSequenceSource() {
TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class);
runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "seq");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS);
Assert.assertEquals(1, flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
logger.error(flowFile.toString());
Assert.assertEquals(1, flowFile.getSize());
}
}
@Test
public void testSourceWithConfig() throws IOException {
File spoolDirectory = new File("target/spooldir");
if (spoolDirectory.exists()) {
FileUtils.deleteFilesInDir(spoolDirectory, null, logger);
} else {
spoolDirectory.mkdirs();
}
File src = new File("src/test/resources/testdata/records.txt");
File dst = new File(spoolDirectory, "records.txt");
FileUtils.copyFile(src, dst, false, false, logger);
TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class);
runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "spooldir");
runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG,
"tier1.sources.src-1.spoolDir = " + spoolDirectory.getAbsolutePath());
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS);
Assert.assertEquals(1, flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
Assert.assertEquals(8, flowFile.getSize());
flowFile.assertContentEquals("record 1");
}
}
}

View File

@ -0,0 +1,25 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:65535</value>
</property>
</configuration>

View File

@ -0,0 +1,25 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>fs.defaultFS</name>
<value>file:///</value>
</property>
</configuration>

View File

@ -0,0 +1,4 @@
record 1
record 2
record 3
record 4

View File

@ -0,0 +1,39 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.0.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-flume-bundle</artifactId>
<version>0.0.1-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<description>A bundle of processors that run Flume sources/sinks</description>
<modules>
<module>nifi-flume-processors</module>
<module>nifi-flume-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flume-processors</artifactId>
<version>0.0.1-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -41,6 +41,7 @@
<module>nifi-hl7-bundle</module>
<module>nifi-language-translation-bundle</module>
<module>nifi-mongodb-bundle</module>
<module>nifi-flume-bundle</module>
</modules>
<dependencyManagement>
<dependencies>