diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java index 95731781f2..71a116be7a 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java @@ -81,6 +81,16 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { .required(false) .addValidator(new ElasticsearchClientValidator()) .build(); + + protected static final PropertyDescriptor PATH_HOME = new PropertyDescriptor.Builder() + .name("ElasticSearch Path Home") + .description("ElasticSearch node client requires that path.home be set. For example, " + + "/usr/share/elasticsearch or /usr/local/opt/elasticsearch for homebrew intall " + + "https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-dir-layout.html") + .required(false) + .addValidator(new ElasticsearchClientValidator()) + .build(); + protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder() .name("ElasticSearch Ping Timeout") .description("The ping timeout used to determine when a node is unreachable. " + @@ -89,6 +99,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { .defaultValue("5s") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder() .name("Sampler Interval") .description("Node sampler interval. For example, 5s (5 seconds) If non-local recommended is 30s") @@ -144,7 +155,14 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { } esClient = transportClient; } else if ("node".equals(clusterType)) { - esClient = NodeBuilder.nodeBuilder().clusterName(clusterName).node().client(); + + final String pathHome = context.getProperty(PATH_HOME).toString(); + //create new node client + Settings settings = Settings.settingsBuilder() + .put("path.home", pathHome) + .build(); + + esClient = NodeBuilder.nodeBuilder().clusterName(clusterName).settings(settings).node().client(); } } catch (Exception e) { log.error("Failed to create Elasticsearch client due to {}", new Object[]{e}, e); @@ -205,7 +223,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { } /** - * A custom validator for the Elasticsearch properties list. For example, the hostnames property doesn't need to + * A custom validator for the ElasticSearch properties list. For example, the hostnames property doesn't need to * be filled in for a Node client, as it joins the cluster by name. Alternatively if a Transport client */ protected static class ElasticsearchClientValidator implements Validator { @@ -220,6 +238,16 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { CLIENT_TYPE.getName(), clientTypeProperty.getValue(), context); } } + + // Only validate Path home if client type == Node + if (PATH_HOME.getName().equals(subject)) { + PropertyValue clientTypeProperty = context.getProperty(CLIENT_TYPE); + if (NODE_CLIENT.getValue().equals(clientTypeProperty.getValue())) { + return StandardValidators.NON_EMPTY_VALIDATOR.validate( + CLIENT_TYPE.getName(), clientTypeProperty.getValue(), context); + } + } + return VALID.validate(subject, input, context); } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java index 003aa30005..43f3a6bf52 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java @@ -116,6 +116,7 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor { descriptors.add(CLIENT_TYPE); descriptors.add(CLUSTER_NAME); descriptors.add(HOSTS); + descriptors.add(PATH_HOME); descriptors.add(PING_TIMEOUT); descriptors.add(SAMPLER_INTERVAL); descriptors.add(ID_ATTRIBUTE); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java index 68a0a78e89..6af8fd2448 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java @@ -21,8 +21,6 @@ import com.google.gson.JsonParser; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.MockProcessContext; -import org.apache.nifi.util.MockProcessorInitializationContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.elasticsearch.action.ActionListener; @@ -32,11 +30,8 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.support.AbstractListenableActionFuture; import org.elasticsearch.action.support.AdapterActionFuture; -import org.elasticsearch.action.support.PlainListenableActionFuture; import org.elasticsearch.client.Client; -import org.elasticsearch.common.unit.TimeValue; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -125,6 +120,35 @@ public class TestPutElasticsearch { out.assertAttributeEquals("tweet_id", "28039652140"); } + @Test + public void testPutElasticSearchOnTriggerNode() throws IOException { + runner = TestRunners.newTestRunner(new ElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(false); + runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE,"node"); + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + + runner.setProperty(PutElasticsearch.INDEX, "tweet"); + runner.assertNotValid(); + runner.setProperty(PutElasticsearch.TYPE, "status"); + runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); + runner.assertNotValid(); + runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id"); + runner.assertValid(); + + runner.enqueue(twitterExample, new HashMap() {{ + put("tweet_id", "28039652141"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("tweet_id", "28039652141"); + } + /** * A Test class that extends the processor in order to inject/mock behavior */ @@ -197,14 +221,56 @@ public class TestPutElasticsearch { }.getClass().getEnclosingMethod().getName()); final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch()); runner.setValidateExpressionUsage(false); + //Local Cluster - Mac pulled from brew + runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, "transport"); runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(PutElasticsearch.INDEX, "tweet"); runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearch.TYPE, "status"); + runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id"); + runner.assertValid(); + + runner.enqueue(twitterExample, new HashMap() {{ + put("tweet_id", "28039652140"); + }}); + + + runner.enqueue(twitterExample); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0); + + } + + @Test + @Ignore("Comment this out if you want to run against local or test ES") + public void testPutElasticSearchBasicNode() throws IOException { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch()); + runner.setValidateExpressionUsage(false); + + //Local Cluster - Mac pulled from brew + runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, "node"); + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticsearchProcessor.PATH_HOME, "/usr/local/opt/elasticsearch"); + runner.setProperty(PutElasticsearch.INDEX, "tweet"); + runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); + + runner.setProperty(PutElasticsearch.TYPE, "status"); + runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id"); + runner.assertValid(); + + runner.enqueue(twitterExample, new HashMap() {{ + put("tweet_id", "28039652141"); + }}); runner.enqueue(twitterExample); runner.run(1, true, true); @@ -221,7 +287,9 @@ public class TestPutElasticsearch { }.getClass().getEnclosingMethod().getName()); final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch()); runner.setValidateExpressionUsage(false); + //Local Cluster - Mac pulled from brew + runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, "transport"); runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); @@ -229,6 +297,11 @@ public class TestPutElasticsearch { runner.setProperty(PutElasticsearch.INDEX, "tweet"); runner.setProperty(PutElasticsearch.BATCH_SIZE, "100"); + runner.setProperty(PutElasticsearch.TYPE, "status"); + runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id"); + runner.assertValid(); + + JsonParser parser = new JsonParser(); JsonObject json; String message = convertStreamToString(twitterExample); @@ -237,8 +310,11 @@ public class TestPutElasticsearch { json = parser.parse(message).getAsJsonObject(); String id = json.get("id").getAsString(); long newId = Long.parseLong(id) + i; - json.addProperty("id", newId); - runner.enqueue(message.getBytes()); + final String newStrId = Long.toString(newId); + //json.addProperty("id", newId); + runner.enqueue(message.getBytes(), new HashMap() {{ + put("tweet_id", newStrId); + }}); }