mirror of https://github.com/apache/nifi.git
Initial commit for the elasticsearch bundle to Nifi
This commit is contained in:
parent
8cf34c3ea5
commit
e6cfcf40d0
|
@ -0,0 +1,24 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<artifactId>nifi-elasticsearch-bundle</artifactId>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<version>0.4.1-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<groupId>gov.pnnl.nifi</groupId>
|
||||||
|
<artifactId>nifi-elasticsearch-nar</artifactId>
|
||||||
|
<packaging>nar</packaging>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-elasticsearch-processors</artifactId>
|
||||||
|
<version>0.4.1-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
</project>
|
|
@ -0,0 +1,24 @@
|
||||||
|
nifi-elasticsearch-nar
|
||||||
|
Copyright 2015 The Apache Software Foundation
|
||||||
|
|
||||||
|
This product includes software developed at
|
||||||
|
The Apache Software Foundation (http://www.apache.org/).
|
||||||
|
|
||||||
|
******************
|
||||||
|
Apache Software License v2
|
||||||
|
******************
|
||||||
|
|
||||||
|
The following binary components are provided under the Apache Software License v2
|
||||||
|
|
||||||
|
(ASLv2) Apache Commons Lang
|
||||||
|
The following NOTICE information applies:
|
||||||
|
Apache Commons Lang
|
||||||
|
Copyright 2001-2014 The Apache Software Foundation
|
||||||
|
|
||||||
|
This product includes software from the Spring Framework,
|
||||||
|
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
|
||||||
|
|
||||||
|
(ASLv2) Apache Commons IO
|
||||||
|
The following NOTICE information applies:
|
||||||
|
Apache Commons IO
|
||||||
|
Copyright 2002-2012 The Apache Software Foundation
|
|
@ -0,0 +1,63 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<artifactId>nifi-elasticsearch-bundle</artifactId>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<version>0.4.1-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-elasticsearch-processors</artifactId>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<slf4jversion>1.7.12</slf4jversion>
|
||||||
|
<es.version>1.7.1</es.version>
|
||||||
|
<gsonversion>2.4</gsonversion>
|
||||||
|
<jodatimeversion>2.9.1</jodatimeversion>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-processor-utils</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-mock</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-api</artifactId>
|
||||||
|
<version>${slf4jversion}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-simple</artifactId>
|
||||||
|
<version>${slf4jversion}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.elasticsearch</groupId>
|
||||||
|
<artifactId>elasticsearch</artifactId>
|
||||||
|
<version>${es.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.google.code.gson</groupId>
|
||||||
|
<artifactId>gson</artifactId>
|
||||||
|
<version>${gsonversion}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>joda-time</groupId>
|
||||||
|
<artifactId>joda-time</artifactId>
|
||||||
|
<version>${jodatimeversion}</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
</project>
|
|
@ -0,0 +1,232 @@
|
||||||
|
package org.apache.nifi.processors.elasticsearch;
|
||||||
|
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.elasticsearch.client.transport.TransportClient;
|
||||||
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
|
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
|
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.joda.time.format.DateTimeFormat;
|
||||||
|
import org.joda.time.format.DateTimeFormatter;
|
||||||
|
import org.joda.time.format.ISODateTimeFormat;
|
||||||
|
|
||||||
|
import com.google.gson.*;
|
||||||
|
|
||||||
|
public abstract class AbstractElasticSearchProcessor extends AbstractProcessor{
|
||||||
|
|
||||||
|
protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder()
|
||||||
|
.name("Cluster Name")
|
||||||
|
.description("Name of the ES cluster. For example, elasticsearch_brew")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder()
|
||||||
|
.name("ElasticSearch Hosts")
|
||||||
|
.description("ElasticSearch Hosts, which should be comma separated and colon for hostname/port " +
|
||||||
|
"host1:port,host2:port,.... For example testcluster:9300")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder()
|
||||||
|
.name("ElasticSearch Ping Timeout")
|
||||||
|
.description("The ping timeout used to determine when a node is unreachable. " +
|
||||||
|
"For example, 5s (5 seconds). If non-local recommended is 30s")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("5s")
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder()
|
||||||
|
.name("Sampler Interval")
|
||||||
|
.description("Node sampler interval. For example, 5s (5 seconds) If non-local recommended is 30s")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("5s")
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
protected static final PropertyDescriptor INDEX_STRATEGY = new PropertyDescriptor.Builder()
|
||||||
|
.name("Index Strategy")
|
||||||
|
.description("Pick the index strategy. Yearly, Monthly, Daily, Hourly")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("Monthly")
|
||||||
|
.allowableValues("Yearly", "Monthly", "Daily", "Hourly")
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
protected TransportClient esClient;
|
||||||
|
protected List<InetSocketAddress> esHosts;
|
||||||
|
protected String indexPrefix;
|
||||||
|
protected static String indexStrategy;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Instantiate ElasticSearch Client
|
||||||
|
* @param context
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@OnScheduled
|
||||||
|
public final void createClient(ProcessContext context) throws IOException {
|
||||||
|
if (esClient != null) {
|
||||||
|
closeClient();
|
||||||
|
}
|
||||||
|
|
||||||
|
getLogger().info("Creating ElasticSearch Client");
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
final String clusterName = context.getProperty(CLUSTER_NAME).toString();
|
||||||
|
final String pingTimeout = context.getProperty(PING_TIMEOUT).toString();
|
||||||
|
final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).toString();
|
||||||
|
indexStrategy = context.getProperty(INDEX_STRATEGY).toString();
|
||||||
|
|
||||||
|
//create new transport client
|
||||||
|
esClient = new TransportClient(
|
||||||
|
ImmutableSettings.builder()
|
||||||
|
.put("cluster.name", clusterName)
|
||||||
|
.put("client.transport.ping_timeout", pingTimeout)
|
||||||
|
.put("client.transport.nodes_sampler_interval", samplerInterval),
|
||||||
|
false);
|
||||||
|
|
||||||
|
|
||||||
|
final String hosts = context.getProperty(HOSTS).toString();
|
||||||
|
esHosts = GetEsHosts(hosts);
|
||||||
|
|
||||||
|
for (final InetSocketAddress host : esHosts) {
|
||||||
|
esClient.addTransportAddress(new InetSocketTransportAddress(host));
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
getLogger().error("Failed to schedule PutElasticSearch due to {}", new Object[] { e }, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dispose of ElasticSearch client
|
||||||
|
*/
|
||||||
|
@OnStopped
|
||||||
|
public final void closeClient() {
|
||||||
|
if (esClient != null) {
|
||||||
|
getLogger().info("Closing ElasticSearch Client");
|
||||||
|
esClient.close();
|
||||||
|
esClient = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the ElasticSearch hosts from the Nifi attribute
|
||||||
|
* @param hosts A comma separted list of ElasticSearch hosts
|
||||||
|
* @return List of InetSockeAddresses for the ES hosts
|
||||||
|
*/
|
||||||
|
private List<InetSocketAddress> GetEsHosts(String hosts){
|
||||||
|
|
||||||
|
final List<String> esList = Arrays.asList(hosts.split(","));
|
||||||
|
List<InetSocketAddress> esHosts = new ArrayList<>();
|
||||||
|
|
||||||
|
for(String item : esList){
|
||||||
|
|
||||||
|
String[] addresses = item.split(":");
|
||||||
|
final String hostName = addresses[0];
|
||||||
|
final int port = Integer.parseInt(addresses[1]);
|
||||||
|
|
||||||
|
esHosts.add(new InetSocketAddress(hostName, port));
|
||||||
|
}
|
||||||
|
|
||||||
|
return esHosts;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get ElasticSearch index for data
|
||||||
|
* @param input
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public String getIndex(final JsonObject input) {
|
||||||
|
|
||||||
|
return extractIndexString(input);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get ElasticSearch Type
|
||||||
|
* @param input
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public String getType(final JsonObject input) {
|
||||||
|
return "status";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get id for ElasticSearch
|
||||||
|
* @param input
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public String getId(final JsonObject input) {
|
||||||
|
|
||||||
|
return input.get("id").getAsString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Source for ElasticSearch
|
||||||
|
* @param input
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public byte[] getSource(final JsonObject input) {
|
||||||
|
String jsonString = input.toString();
|
||||||
|
jsonString = jsonString.replace("\r\n", " ").replace('\n', ' ').replace('\r', ' ');
|
||||||
|
return jsonString.getBytes(StandardCharsets.UTF_8);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Identify ElasticSearch index where data will land
|
||||||
|
* @param parsedJson
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private static String extractIndexString(final JsonObject parsedJson) {
|
||||||
|
final String extractedDate = "created_at";
|
||||||
|
if(!parsedJson.has(extractedDate))
|
||||||
|
throw new IllegalArgumentException("Message is missing " + extractedDate);
|
||||||
|
|
||||||
|
final DateTimeFormatter format =
|
||||||
|
DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy").withLocale(Locale.ENGLISH);
|
||||||
|
|
||||||
|
final String dateElement = parsedJson.get(extractedDate).getAsString();
|
||||||
|
final DateTimeFormatter isoFormat = ISODateTimeFormat.dateTime();
|
||||||
|
final DateTime dateTime = isoFormat.parseDateTime(format.parseDateTime(dateElement).toString());
|
||||||
|
|
||||||
|
final DateTimeFormatter dateFormat;
|
||||||
|
//Create ElasticSearch Index
|
||||||
|
switch (indexStrategy){
|
||||||
|
|
||||||
|
case "Yearly":
|
||||||
|
dateFormat = DateTimeFormat.forPattern("yyyy_MM");
|
||||||
|
break;
|
||||||
|
case "Monthly":
|
||||||
|
dateFormat = DateTimeFormat.forPattern("yyyy_MM");
|
||||||
|
break;
|
||||||
|
case "Daily":
|
||||||
|
dateFormat = DateTimeFormat.forPattern("yyyy_MM_dd");
|
||||||
|
break;
|
||||||
|
case "Hourly":
|
||||||
|
dateFormat = DateTimeFormat.forPattern("yyyy_MM_dd_HH");
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("Invalid index strategy selected: " + indexStrategy);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
//ElasticSearch indexes must be lowercase
|
||||||
|
final String strategy = indexStrategy.toLowerCase() + "_" + dateFormat.print(dateTime);
|
||||||
|
return strategy;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,197 @@
|
||||||
|
package org.apache.nifi.processors.elasticsearch;
|
||||||
|
|
||||||
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.logging.ProcessorLog;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.processors.elasticsearch.AbstractElasticSearchProcessor;
|
||||||
|
import org.apache.nifi.stream.io.StreamUtils;
|
||||||
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||||
|
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||||
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
|
|
||||||
|
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||||
|
import org.elasticsearch.*;
|
||||||
|
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
import com.google.gson.*;
|
||||||
|
|
||||||
|
@EventDriven
|
||||||
|
@Tags({"elasticsearch", "insert", "update", "write", "put"})
|
||||||
|
@CapabilityDescription("Writes the contents of a FlowFile to ElasticSearch")
|
||||||
|
public class PutElasticSearch extends AbstractElasticSearchProcessor {
|
||||||
|
|
||||||
|
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
|
||||||
|
.description("All FlowFiles that are written to ElasticSearch are routed to this relationship").build();
|
||||||
|
|
||||||
|
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
|
||||||
|
.description("All FlowFiles that cannot be written to ElasticSearch are routed to this relationship").build();
|
||||||
|
|
||||||
|
static final Relationship REL_RETRY = new Relationship.Builder()
|
||||||
|
.name("retry")
|
||||||
|
.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||||
|
.name("Batch Size")
|
||||||
|
.description("The preferred number of FlowFiles to put to the database in a single transaction")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||||
|
.defaultValue("100")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
private final List<PropertyDescriptor> descriptors;
|
||||||
|
|
||||||
|
private final Set<Relationship> relationships;
|
||||||
|
|
||||||
|
public PutElasticSearch() {
|
||||||
|
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||||
|
descriptors.add(CLUSTER_NAME);
|
||||||
|
descriptors.add(HOSTS);
|
||||||
|
descriptors.add(PING_TIMEOUT);
|
||||||
|
descriptors.add(SAMPLER_INTERVAL);
|
||||||
|
descriptors.add(BATCH_SIZE);
|
||||||
|
descriptors.add(INDEX_STRATEGY);
|
||||||
|
this.descriptors = Collections.unmodifiableList(descriptors);
|
||||||
|
|
||||||
|
final Set<Relationship> relationships = new HashSet<>();
|
||||||
|
relationships.add(REL_SUCCESS);
|
||||||
|
relationships.add(REL_FAILURE);
|
||||||
|
relationships.add(REL_RETRY);
|
||||||
|
this.relationships = Collections.unmodifiableSet(relationships);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
return this.relationships;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return descriptors;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||||
|
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
|
||||||
|
final List<FlowFile> flowFiles = session.get(batchSize);
|
||||||
|
if (flowFiles.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final ProcessorLog logger = getLogger();
|
||||||
|
|
||||||
|
try {
|
||||||
|
final BulkRequestBuilder bulk = GetEsBulkRequest(session, flowFiles);
|
||||||
|
final BulkResponse response = bulk.execute().actionGet();
|
||||||
|
if (response.hasFailures()) {
|
||||||
|
for (final BulkItemResponse item : response.getItems()) {
|
||||||
|
final FlowFile flowFile = flowFiles.get(item.getItemId());
|
||||||
|
if (item.isFailed()) {
|
||||||
|
logger.error("Failed to insert {} into ElasticSearch due to {}",
|
||||||
|
new Object[]{flowFile, item.getFailure()}, new Exception());
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (final FlowFile flowFile : flowFiles) {
|
||||||
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
} catch (NoNodeAvailableException nne) {
|
||||||
|
logger.error("Failed to insert {} into ElasticSearch No Node Available {}", new Object[]{nne}, nne);
|
||||||
|
for (final FlowFile flowFile : flowFiles) {
|
||||||
|
session.transfer(flowFile, REL_RETRY);
|
||||||
|
}
|
||||||
|
context.yield();
|
||||||
|
|
||||||
|
} catch (ElasticsearchTimeoutException ete) {
|
||||||
|
logger.error("Failed to insert {} into ElasticSearch Timeout to {}", new Object[]{ete}, ete);
|
||||||
|
for (final FlowFile flowFile : flowFiles) {
|
||||||
|
session.transfer(flowFile, REL_RETRY);
|
||||||
|
}
|
||||||
|
context.yield();
|
||||||
|
|
||||||
|
} catch (ReceiveTimeoutTransportException rtt) {
|
||||||
|
logger.error("Failed to insert {} into ElasticSearch ReceiveTimeoutTransportException to {}", new Object[]{rtt}, rtt);
|
||||||
|
for (final FlowFile flowFile : flowFiles) {
|
||||||
|
session.transfer(flowFile, REL_RETRY);
|
||||||
|
}
|
||||||
|
context.yield();
|
||||||
|
|
||||||
|
} catch (ElasticsearchParseException esp) {
|
||||||
|
logger.error("Failed to insert {} into ElasticSearch Parse Exception {}", new Object[]{esp}, esp);
|
||||||
|
|
||||||
|
for (final FlowFile flowFile : flowFiles) {
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
}
|
||||||
|
context.yield();
|
||||||
|
|
||||||
|
} catch (ElasticsearchException e) {
|
||||||
|
logger.error("Failed to insert {} into ElasticSearch due to {}", new Object[]{e}, e);
|
||||||
|
|
||||||
|
for (final FlowFile flowFile : flowFiles) {
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
}
|
||||||
|
context.yield();
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Failed to insert {} into ElasticSearch due to {}", new Object[]{e}, e);
|
||||||
|
|
||||||
|
for (final FlowFile flowFile : flowFiles) {
|
||||||
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
|
}
|
||||||
|
context.yield();
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the ES bulk request for the session
|
||||||
|
*
|
||||||
|
* @param session ProcessSession
|
||||||
|
* @param flowFiles Flowfiles pulled off of the queue to batch in
|
||||||
|
* @return BulkeRequestBuilder
|
||||||
|
*/
|
||||||
|
private BulkRequestBuilder GetEsBulkRequest(final ProcessSession session, final List<FlowFile> flowFiles) {
|
||||||
|
|
||||||
|
final BulkRequestBuilder bulk = esClient.prepareBulk();
|
||||||
|
for (FlowFile file : flowFiles) {
|
||||||
|
final byte[] content = new byte[(int) file.getSize()];
|
||||||
|
session.read(file, new InputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(final InputStream in) throws IOException {
|
||||||
|
StreamUtils.fillBuffer(in, content, true);
|
||||||
|
|
||||||
|
final String input = new String(content);
|
||||||
|
final JsonParser parser = new JsonParser();
|
||||||
|
final JsonObject json = parser.parse(input).getAsJsonObject();
|
||||||
|
bulk.add(esClient.prepareIndex(getIndex(json), getType(json), getId(json))
|
||||||
|
.setSource(getSource(json)));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
return bulk;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,15 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
org.apache.nifi.processors.elasticsearch.PutElasticSearch
|
|
@ -0,0 +1,103 @@
|
||||||
|
package org.apache.nifi.processors.elasticsearch;
|
||||||
|
|
||||||
|
import com.google.gson.JsonObject;
|
||||||
|
import com.google.gson.JsonParser;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.*;
|
||||||
|
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
public class TestPutElasticSearch {
|
||||||
|
|
||||||
|
private InputStream twitterExample;
|
||||||
|
private TestRunner runner;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
|
||||||
|
twitterExample = classloader
|
||||||
|
.getResourceAsStream("TweetExample.json");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void teardown() {
|
||||||
|
runner = null;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Ignore("Comment this out if you want to run against local or test ES")
|
||||||
|
public void testPutElasticSearchBasic() throws IOException {
|
||||||
|
System.out.println("Starting test " + new Object() {
|
||||||
|
}.getClass().getEnclosingMethod().getName());
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new PutElasticSearch());
|
||||||
|
runner.setValidateExpressionUsage(false);
|
||||||
|
//Local Cluster - Mac pulled from brew
|
||||||
|
runner.setProperty(AbstractElasticSearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
|
||||||
|
runner.setProperty(AbstractElasticSearchProcessor.HOSTS, "127.0.0.1:9300");
|
||||||
|
runner.setProperty(AbstractElasticSearchProcessor.PING_TIMEOUT, "5s");
|
||||||
|
runner.setProperty(AbstractElasticSearchProcessor.SAMPLER_INTERVAL, "5s");
|
||||||
|
runner.setProperty(AbstractElasticSearchProcessor.INDEX_STRATEGY, "Monthly");
|
||||||
|
runner.setProperty(PutElasticSearch.BATCH_SIZE, "1");
|
||||||
|
|
||||||
|
|
||||||
|
runner.enqueue(twitterExample);
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutElasticSearch.REL_SUCCESS, 1);
|
||||||
|
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticSearch.REL_SUCCESS).get(0);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Ignore("Comment this out if you want to run against local or test ES")
|
||||||
|
public void testPutElasticSearchBatch() throws IOException {
|
||||||
|
System.out.println("Starting test " + new Object() {
|
||||||
|
}.getClass().getEnclosingMethod().getName());
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(new PutElasticSearch());
|
||||||
|
runner.setValidateExpressionUsage(false);
|
||||||
|
//Local Cluster - Mac pulled from brew
|
||||||
|
runner.setProperty(AbstractElasticSearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
|
||||||
|
runner.setProperty(AbstractElasticSearchProcessor.HOSTS, "127.0.0.1:9300");
|
||||||
|
runner.setProperty(AbstractElasticSearchProcessor.PING_TIMEOUT, "5s");
|
||||||
|
runner.setProperty(AbstractElasticSearchProcessor.SAMPLER_INTERVAL, "5s");
|
||||||
|
runner.setProperty(AbstractElasticSearchProcessor.INDEX_STRATEGY, "Monthly");
|
||||||
|
runner.setProperty(PutElasticSearch.BATCH_SIZE, "100");
|
||||||
|
|
||||||
|
JsonParser parser = new JsonParser();
|
||||||
|
JsonObject json;
|
||||||
|
String message = convertStreamToString(twitterExample);
|
||||||
|
for (int i = 0;i < 100; i++){
|
||||||
|
|
||||||
|
json = parser.parse(message).getAsJsonObject();
|
||||||
|
String id = json.get("id").getAsString();
|
||||||
|
long newId = Long.parseLong(id) + i;
|
||||||
|
json.addProperty("id", newId);
|
||||||
|
runner.enqueue(message.getBytes());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutElasticSearch.REL_SUCCESS, 100);
|
||||||
|
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticSearch.REL_SUCCESS).get(0);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert an input stream to a stream
|
||||||
|
* @param is input the input stream
|
||||||
|
* @return return the converted input stream as a string
|
||||||
|
*/
|
||||||
|
static String convertStreamToString(java.io.InputStream is) {
|
||||||
|
java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
|
||||||
|
return s.hasNext() ? s.next() : "";
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,83 @@
|
||||||
|
|
||||||
|
{
|
||||||
|
"coordinates": null,
|
||||||
|
"created_at": "Thu Oct 21 16:02:46 +0000 2010",
|
||||||
|
"favorited": false,
|
||||||
|
"truncated": false,
|
||||||
|
"id_str": "28039652140",
|
||||||
|
"entities": {
|
||||||
|
"urls": [
|
||||||
|
{
|
||||||
|
"expanded_url": null,
|
||||||
|
"url": "http://gnip.com/success_stories",
|
||||||
|
"indices": [
|
||||||
|
69,
|
||||||
|
100
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"hashtags": [
|
||||||
|
|
||||||
|
],
|
||||||
|
"user_mentions": [
|
||||||
|
{
|
||||||
|
"name": "Gnip, Inc.",
|
||||||
|
"id_str": "16958875",
|
||||||
|
"id": 16958875,
|
||||||
|
"indices": [
|
||||||
|
25,
|
||||||
|
30
|
||||||
|
],
|
||||||
|
"screen_name": "gnip"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"in_reply_to_user_id_str": null,
|
||||||
|
"text": "what we've been up to at @gnip -- delivering data to happy customers http://gnip.com/success_stories",
|
||||||
|
"contributors": null,
|
||||||
|
"id": 28039652140,
|
||||||
|
"retweet_count": null,
|
||||||
|
"in_reply_to_status_id_str": null,
|
||||||
|
"geo": null,
|
||||||
|
"retweeted": false,
|
||||||
|
"in_reply_to_user_id": null,
|
||||||
|
"user": {
|
||||||
|
"profile_sidebar_border_color": "C0DEED",
|
||||||
|
"name": "Gnip, Inc.",
|
||||||
|
"profile_sidebar_fill_color": "DDEEF6",
|
||||||
|
"profile_background_tile": false,
|
||||||
|
"profile_image_url": "http://a3.twimg.com/profile_images/62803643/icon_normal.png",
|
||||||
|
"location": "Boulder, CO",
|
||||||
|
"created_at": "Fri Oct 24 23:22:09 +0000 2008",
|
||||||
|
"id_str": "16958875",
|
||||||
|
"follow_request_sent": false,
|
||||||
|
"profile_link_color": "0084B4",
|
||||||
|
"favourites_count": 1,
|
||||||
|
"url": "http://blog.gnip.com",
|
||||||
|
"contributors_enabled": false,
|
||||||
|
"utc_offset": -25200,
|
||||||
|
"id": 16958875,
|
||||||
|
"profile_use_background_image": true,
|
||||||
|
"listed_count": 23,
|
||||||
|
"protected": false,
|
||||||
|
"lang": "en",
|
||||||
|
"profile_text_color": "333333",
|
||||||
|
"followers_count": 260,
|
||||||
|
"time_zone": "Mountain Time (US & Canada)",
|
||||||
|
"verified": false,
|
||||||
|
"geo_enabled": true,
|
||||||
|
"profile_background_color": "C0DEED",
|
||||||
|
"notifications": false,
|
||||||
|
"description": "Gnip makes it really easy for you to collect social data for your business.",
|
||||||
|
"friends_count": 71,
|
||||||
|
"profile_background_image_url": "http://s.twimg.com/a/1287010001/images/themes/theme1/bg.png",
|
||||||
|
"statuses_count": 302,
|
||||||
|
"screen_name": "gnip",
|
||||||
|
"following": false,
|
||||||
|
"show_all_inline_media": false
|
||||||
|
},
|
||||||
|
"in_reply_to_screen_name": null,
|
||||||
|
"source": "web",
|
||||||
|
"place": null,
|
||||||
|
"in_reply_to_status_id": null
|
||||||
|
}
|
|
@ -0,0 +1,21 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-nar-bundles</artifactId>
|
||||||
|
<version>0.4.1-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-elasticsearch-bundle</artifactId>
|
||||||
|
<packaging>pom</packaging>
|
||||||
|
<modules>
|
||||||
|
<module>nifi-elasticsearch-nar</module>
|
||||||
|
<module>nifi-elasticsearch-processors</module>
|
||||||
|
</modules>
|
||||||
|
|
||||||
|
</project>
|
Loading…
Reference in New Issue