diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/pom.xml
new file mode 100644
index 0000000000..44492a5f55
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/pom.xml
@@ -0,0 +1,24 @@
+
+
+ 4.0.0
+
+ nifi-elasticsearch-bundle
+ org.apache.nifi
+ 0.4.1-SNAPSHOT
+
+
+ gov.pnnl.nifi
+ nifi-elasticsearch-nar
+ nar
+
+
+
+ org.apache.nifi
+ nifi-elasticsearch-processors
+ 0.4.1-SNAPSHOT
+
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/NOTICE.txt b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/NOTICE.txt
new file mode 100644
index 0000000000..8c660af424
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/NOTICE.txt
@@ -0,0 +1,24 @@
+nifi-elasticsearch-nar
+Copyright 2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License v2
+
+ (ASLv2) Apache Commons Lang
+ The following NOTICE information applies:
+ Apache Commons Lang
+ Copyright 2001-2014 The Apache Software Foundation
+
+ This product includes software from the Spring Framework,
+ under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+ (ASLv2) Apache Commons IO
+ The following NOTICE information applies:
+ Apache Commons IO
+ Copyright 2002-2012 The Apache Software Foundation
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
new file mode 100644
index 0000000000..3208ced1d3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
@@ -0,0 +1,63 @@
+
+
+ 4.0.0
+
+ nifi-elasticsearch-bundle
+ org.apache.nifi
+ 0.4.1-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-elasticsearch-processors
+
+
+ 1.7.12
+ 1.7.1
+ 2.4
+ 2.9.1
+
+
+
+
+ org.apache.nifi
+ nifi-api
+
+
+ org.apache.nifi
+ nifi-processor-utils
+
+
+ org.apache.nifi
+ nifi-mock
+ test
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4jversion}
+
+
+ org.slf4j
+ slf4j-simple
+ ${slf4jversion}
+
+
+ org.elasticsearch
+ elasticsearch
+ ${es.version}
+
+
+ com.google.code.gson
+ gson
+ ${gsonversion}
+
+
+ joda-time
+ joda-time
+ ${jodatimeversion}
+
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticSearchProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticSearchProcessor.java
new file mode 100644
index 0000000000..6bfd37f44b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticSearchProcessor.java
@@ -0,0 +1,232 @@
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+
+import com.google.gson.*;
+
+public abstract class AbstractElasticSearchProcessor extends AbstractProcessor{
+
+ protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder()
+ .name("Cluster Name")
+ .description("Name of the ES cluster. For example, elasticsearch_brew")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder()
+ .name("ElasticSearch Hosts")
+ .description("ElasticSearch Hosts, which should be comma separated and colon for hostname/port " +
+ "host1:port,host2:port,.... For example testcluster:9300")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("ElasticSearch Ping Timeout")
+ .description("The ping timeout used to determine when a node is unreachable. " +
+ "For example, 5s (5 seconds). If non-local recommended is 30s")
+ .required(true)
+ .defaultValue("5s")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder()
+ .name("Sampler Interval")
+ .description("Node sampler interval. For example, 5s (5 seconds) If non-local recommended is 30s")
+ .required(true)
+ .defaultValue("5s")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor INDEX_STRATEGY = new PropertyDescriptor.Builder()
+ .name("Index Strategy")
+ .description("Pick the index strategy. Yearly, Monthly, Daily, Hourly")
+ .required(true)
+ .defaultValue("Monthly")
+ .allowableValues("Yearly", "Monthly", "Daily", "Hourly")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected TransportClient esClient;
+ protected List esHosts;
+ protected String indexPrefix;
+ protected static String indexStrategy;
+
+ /**
+ * Instantiate ElasticSearch Client
+ * @param context
+ * @throws IOException
+ */
+ @OnScheduled
+ public final void createClient(ProcessContext context) throws IOException {
+ if (esClient != null) {
+ closeClient();
+ }
+
+ getLogger().info("Creating ElasticSearch Client");
+
+ try {
+
+ final String clusterName = context.getProperty(CLUSTER_NAME).toString();
+ final String pingTimeout = context.getProperty(PING_TIMEOUT).toString();
+ final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).toString();
+ indexStrategy = context.getProperty(INDEX_STRATEGY).toString();
+
+ //create new transport client
+ esClient = new TransportClient(
+ ImmutableSettings.builder()
+ .put("cluster.name", clusterName)
+ .put("client.transport.ping_timeout", pingTimeout)
+ .put("client.transport.nodes_sampler_interval", samplerInterval),
+ false);
+
+
+ final String hosts = context.getProperty(HOSTS).toString();
+ esHosts = GetEsHosts(hosts);
+
+ for (final InetSocketAddress host : esHosts) {
+ esClient.addTransportAddress(new InetSocketTransportAddress(host));
+ }
+ } catch (Exception e) {
+ getLogger().error("Failed to schedule PutElasticSearch due to {}", new Object[] { e }, e);
+ throw e;
+ }
+ }
+
+ /**
+ * Dispose of ElasticSearch client
+ */
+ @OnStopped
+ public final void closeClient() {
+ if (esClient != null) {
+ getLogger().info("Closing ElasticSearch Client");
+ esClient.close();
+ esClient = null;
+ }
+ }
+
+ /**
+ * Get the ElasticSearch hosts from the Nifi attribute
+ * @param hosts A comma separted list of ElasticSearch hosts
+ * @return List of InetSockeAddresses for the ES hosts
+ */
+ private List GetEsHosts(String hosts){
+
+ final List esList = Arrays.asList(hosts.split(","));
+ List esHosts = new ArrayList<>();
+
+ for(String item : esList){
+
+ String[] addresses = item.split(":");
+ final String hostName = addresses[0];
+ final int port = Integer.parseInt(addresses[1]);
+
+ esHosts.add(new InetSocketAddress(hostName, port));
+ }
+
+ return esHosts;
+
+ }
+
+
+ /**
+ * Get ElasticSearch index for data
+ * @param input
+ * @return
+ */
+ public String getIndex(final JsonObject input) {
+
+ return extractIndexString(input);
+ }
+
+ /**
+ * Get ElasticSearch Type
+ * @param input
+ * @return
+ */
+ public String getType(final JsonObject input) {
+ return "status";
+ }
+
+ /**
+ * Get id for ElasticSearch
+ * @param input
+ * @return
+ */
+ public String getId(final JsonObject input) {
+
+ return input.get("id").getAsString();
+ }
+
+ /**
+ * Get Source for ElasticSearch
+ * @param input
+ * @return
+ */
+ public byte[] getSource(final JsonObject input) {
+ String jsonString = input.toString();
+ jsonString = jsonString.replace("\r\n", " ").replace('\n', ' ').replace('\r', ' ');
+ return jsonString.getBytes(StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Identify ElasticSearch index where data will land
+ * @param parsedJson
+ * @return
+ */
+ private static String extractIndexString(final JsonObject parsedJson) {
+ final String extractedDate = "created_at";
+ if(!parsedJson.has(extractedDate))
+ throw new IllegalArgumentException("Message is missing " + extractedDate);
+
+ final DateTimeFormatter format =
+ DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy").withLocale(Locale.ENGLISH);
+
+ final String dateElement = parsedJson.get(extractedDate).getAsString();
+ final DateTimeFormatter isoFormat = ISODateTimeFormat.dateTime();
+ final DateTime dateTime = isoFormat.parseDateTime(format.parseDateTime(dateElement).toString());
+
+ final DateTimeFormatter dateFormat;
+ //Create ElasticSearch Index
+ switch (indexStrategy){
+
+ case "Yearly":
+ dateFormat = DateTimeFormat.forPattern("yyyy_MM");
+ break;
+ case "Monthly":
+ dateFormat = DateTimeFormat.forPattern("yyyy_MM");
+ break;
+ case "Daily":
+ dateFormat = DateTimeFormat.forPattern("yyyy_MM_dd");
+ break;
+ case "Hourly":
+ dateFormat = DateTimeFormat.forPattern("yyyy_MM_dd_HH");
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid index strategy selected: " + indexStrategy);
+
+ }
+
+ //ElasticSearch indexes must be lowercase
+ final String strategy = indexStrategy.toLowerCase() + "_" + dateFormat.print(dateTime);
+ return strategy;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticSearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticSearch.java
new file mode 100644
index 0000000000..9740d39dd5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticSearch.java
@@ -0,0 +1,197 @@
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.AbstractElasticSearchProcessor;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+
+import org.elasticsearch.client.transport.NoNodeAvailableException;
+import org.elasticsearch.*;
+import org.elasticsearch.transport.ReceiveTimeoutTransportException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+
+import com.google.gson.*;
+
+@EventDriven
+@Tags({"elasticsearch", "insert", "update", "write", "put"})
+@CapabilityDescription("Writes the contents of a FlowFile to ElasticSearch")
+public class PutElasticSearch extends AbstractElasticSearchProcessor {
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+ .description("All FlowFiles that are written to ElasticSearch are routed to this relationship").build();
+
+ static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+ .description("All FlowFiles that cannot be written to ElasticSearch are routed to this relationship").build();
+
+ static final Relationship REL_RETRY = new Relationship.Builder()
+ .name("retry")
+ .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("The preferred number of FlowFiles to put to the database in a single transaction")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("100")
+ .build();
+
+ private final List descriptors;
+
+ private final Set relationships;
+
+ public PutElasticSearch() {
+ final List descriptors = new ArrayList<>();
+ descriptors.add(CLUSTER_NAME);
+ descriptors.add(HOSTS);
+ descriptors.add(PING_TIMEOUT);
+ descriptors.add(SAMPLER_INTERVAL);
+ descriptors.add(BATCH_SIZE);
+ descriptors.add(INDEX_STRATEGY);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ relationships.add(REL_RETRY);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set getRelationships() {
+ return this.relationships;
+ }
+
+ @Override
+ public final List getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final List flowFiles = session.get(batchSize);
+ if (flowFiles.isEmpty()) {
+ return;
+ }
+
+ final ProcessorLog logger = getLogger();
+
+ try {
+ final BulkRequestBuilder bulk = GetEsBulkRequest(session, flowFiles);
+ final BulkResponse response = bulk.execute().actionGet();
+ if (response.hasFailures()) {
+ for (final BulkItemResponse item : response.getItems()) {
+ final FlowFile flowFile = flowFiles.get(item.getItemId());
+ if (item.isFailed()) {
+ logger.error("Failed to insert {} into ElasticSearch due to {}",
+ new Object[]{flowFile, item.getFailure()}, new Exception());
+ session.transfer(flowFile, REL_FAILURE);
+
+ } else {
+ session.transfer(flowFile, REL_SUCCESS);
+
+ }
+ }
+ } else {
+ for (final FlowFile flowFile : flowFiles) {
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+ }
+
+
+ } catch (NoNodeAvailableException nne) {
+ logger.error("Failed to insert {} into ElasticSearch No Node Available {}", new Object[]{nne}, nne);
+ for (final FlowFile flowFile : flowFiles) {
+ session.transfer(flowFile, REL_RETRY);
+ }
+ context.yield();
+
+ } catch (ElasticsearchTimeoutException ete) {
+ logger.error("Failed to insert {} into ElasticSearch Timeout to {}", new Object[]{ete}, ete);
+ for (final FlowFile flowFile : flowFiles) {
+ session.transfer(flowFile, REL_RETRY);
+ }
+ context.yield();
+
+ } catch (ReceiveTimeoutTransportException rtt) {
+ logger.error("Failed to insert {} into ElasticSearch ReceiveTimeoutTransportException to {}", new Object[]{rtt}, rtt);
+ for (final FlowFile flowFile : flowFiles) {
+ session.transfer(flowFile, REL_RETRY);
+ }
+ context.yield();
+
+ } catch (ElasticsearchParseException esp) {
+ logger.error("Failed to insert {} into ElasticSearch Parse Exception {}", new Object[]{esp}, esp);
+
+ for (final FlowFile flowFile : flowFiles) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ context.yield();
+
+ } catch (ElasticsearchException e) {
+ logger.error("Failed to insert {} into ElasticSearch due to {}", new Object[]{e}, e);
+
+ for (final FlowFile flowFile : flowFiles) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ context.yield();
+
+ } catch (Exception e) {
+ logger.error("Failed to insert {} into ElasticSearch due to {}", new Object[]{e}, e);
+
+ for (final FlowFile flowFile : flowFiles) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ context.yield();
+
+ }
+ }
+
+ /**
+ * Get the ES bulk request for the session
+ *
+ * @param session ProcessSession
+ * @param flowFiles Flowfiles pulled off of the queue to batch in
+ * @return BulkeRequestBuilder
+ */
+ private BulkRequestBuilder GetEsBulkRequest(final ProcessSession session, final List flowFiles) {
+
+ final BulkRequestBuilder bulk = esClient.prepareBulk();
+ for (FlowFile file : flowFiles) {
+ final byte[] content = new byte[(int) file.getSize()];
+ session.read(file, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ StreamUtils.fillBuffer(in, content, true);
+
+ final String input = new String(content);
+ final JsonParser parser = new JsonParser();
+ final JsonObject json = parser.parse(input).getAsJsonObject();
+ bulk.add(esClient.prepareIndex(getIndex(json), getType(json), getId(json))
+ .setSource(getSource(json)));
+
+ }
+
+ });
+
+ }
+ return bulk;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000000..7c14a424a3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.elasticsearch.PutElasticSearch
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticSearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticSearch.java
new file mode 100644
index 0000000000..c8b7ab65f2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticSearch.java
@@ -0,0 +1,103 @@
+package org.apache.nifi.processors.elasticsearch;
+
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.*;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class TestPutElasticSearch {
+
+ private InputStream twitterExample;
+ private TestRunner runner;
+
+ @Before
+ public void setUp() throws IOException {
+ ClassLoader classloader = Thread.currentThread().getContextClassLoader();
+ twitterExample = classloader
+ .getResourceAsStream("TweetExample.json");
+
+ }
+
+ @After
+ public void teardown() {
+ runner = null;
+
+ }
+
+
+ @Test
+ @Ignore("Comment this out if you want to run against local or test ES")
+ public void testPutElasticSearchBasic() throws IOException {
+ System.out.println("Starting test " + new Object() {
+ }.getClass().getEnclosingMethod().getName());
+ final TestRunner runner = TestRunners.newTestRunner(new PutElasticSearch());
+ runner.setValidateExpressionUsage(false);
+ //Local Cluster - Mac pulled from brew
+ runner.setProperty(AbstractElasticSearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
+ runner.setProperty(AbstractElasticSearchProcessor.HOSTS, "127.0.0.1:9300");
+ runner.setProperty(AbstractElasticSearchProcessor.PING_TIMEOUT, "5s");
+ runner.setProperty(AbstractElasticSearchProcessor.SAMPLER_INTERVAL, "5s");
+ runner.setProperty(AbstractElasticSearchProcessor.INDEX_STRATEGY, "Monthly");
+ runner.setProperty(PutElasticSearch.BATCH_SIZE, "1");
+
+
+ runner.enqueue(twitterExample);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutElasticSearch.REL_SUCCESS, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticSearch.REL_SUCCESS).get(0);
+
+
+ }
+
+ @Test
+ @Ignore("Comment this out if you want to run against local or test ES")
+ public void testPutElasticSearchBatch() throws IOException {
+ System.out.println("Starting test " + new Object() {
+ }.getClass().getEnclosingMethod().getName());
+ final TestRunner runner = TestRunners.newTestRunner(new PutElasticSearch());
+ runner.setValidateExpressionUsage(false);
+ //Local Cluster - Mac pulled from brew
+ runner.setProperty(AbstractElasticSearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
+ runner.setProperty(AbstractElasticSearchProcessor.HOSTS, "127.0.0.1:9300");
+ runner.setProperty(AbstractElasticSearchProcessor.PING_TIMEOUT, "5s");
+ runner.setProperty(AbstractElasticSearchProcessor.SAMPLER_INTERVAL, "5s");
+ runner.setProperty(AbstractElasticSearchProcessor.INDEX_STRATEGY, "Monthly");
+ runner.setProperty(PutElasticSearch.BATCH_SIZE, "100");
+
+ JsonParser parser = new JsonParser();
+ JsonObject json;
+ String message = convertStreamToString(twitterExample);
+ for (int i = 0;i < 100; i++){
+
+ json = parser.parse(message).getAsJsonObject();
+ String id = json.get("id").getAsString();
+ long newId = Long.parseLong(id) + i;
+ json.addProperty("id", newId);
+ runner.enqueue(message.getBytes());
+
+ }
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutElasticSearch.REL_SUCCESS, 100);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticSearch.REL_SUCCESS).get(0);
+
+ }
+
+ /**
+ * Convert an input stream to a stream
+ * @param is input the input stream
+ * @return return the converted input stream as a string
+ */
+ static String convertStreamToString(java.io.InputStream is) {
+ java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
+ return s.hasNext() ? s.next() : "";
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json
new file mode 100644
index 0000000000..7375be6a2c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/TweetExample.json
@@ -0,0 +1,83 @@
+
+{
+ "coordinates": null,
+ "created_at": "Thu Oct 21 16:02:46 +0000 2010",
+ "favorited": false,
+ "truncated": false,
+ "id_str": "28039652140",
+ "entities": {
+ "urls": [
+ {
+ "expanded_url": null,
+ "url": "http://gnip.com/success_stories",
+ "indices": [
+ 69,
+ 100
+ ]
+ }
+ ],
+ "hashtags": [
+
+ ],
+ "user_mentions": [
+ {
+ "name": "Gnip, Inc.",
+ "id_str": "16958875",
+ "id": 16958875,
+ "indices": [
+ 25,
+ 30
+ ],
+ "screen_name": "gnip"
+ }
+ ]
+ },
+ "in_reply_to_user_id_str": null,
+ "text": "what we've been up to at @gnip -- delivering data to happy customers http://gnip.com/success_stories",
+ "contributors": null,
+ "id": 28039652140,
+ "retweet_count": null,
+ "in_reply_to_status_id_str": null,
+ "geo": null,
+ "retweeted": false,
+ "in_reply_to_user_id": null,
+ "user": {
+ "profile_sidebar_border_color": "C0DEED",
+ "name": "Gnip, Inc.",
+ "profile_sidebar_fill_color": "DDEEF6",
+ "profile_background_tile": false,
+ "profile_image_url": "http://a3.twimg.com/profile_images/62803643/icon_normal.png",
+ "location": "Boulder, CO",
+ "created_at": "Fri Oct 24 23:22:09 +0000 2008",
+ "id_str": "16958875",
+ "follow_request_sent": false,
+ "profile_link_color": "0084B4",
+ "favourites_count": 1,
+ "url": "http://blog.gnip.com",
+ "contributors_enabled": false,
+ "utc_offset": -25200,
+ "id": 16958875,
+ "profile_use_background_image": true,
+ "listed_count": 23,
+ "protected": false,
+ "lang": "en",
+ "profile_text_color": "333333",
+ "followers_count": 260,
+ "time_zone": "Mountain Time (US & Canada)",
+ "verified": false,
+ "geo_enabled": true,
+ "profile_background_color": "C0DEED",
+ "notifications": false,
+ "description": "Gnip makes it really easy for you to collect social data for your business.",
+ "friends_count": 71,
+ "profile_background_image_url": "http://s.twimg.com/a/1287010001/images/themes/theme1/bg.png",
+ "statuses_count": 302,
+ "screen_name": "gnip",
+ "following": false,
+ "show_all_inline_media": false
+ },
+ "in_reply_to_screen_name": null,
+ "source": "web",
+ "place": null,
+ "in_reply_to_status_id": null
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
new file mode 100644
index 0000000000..9945c3eac8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -0,0 +1,21 @@
+
+
+ 4.0.0
+
+
+ org.apache.nifi
+ nifi-nar-bundles
+ 0.4.1-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-elasticsearch-bundle
+ pom
+
+ nifi-elasticsearch-nar
+ nifi-elasticsearch-processors
+
+
+
\ No newline at end of file