Added path home property and unit test to elasticsearch processor in support of the node client

This commit is contained in:
scarpacci 2015-12-30 09:27:49 -08:00 committed by Matt Burgess
parent 943d0a6e53
commit 0c137bc22d
3 changed files with 114 additions and 9 deletions

View File

@ -81,6 +81,16 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
.required(false)
.addValidator(new ElasticsearchClientValidator())
.build();
protected static final PropertyDescriptor PATH_HOME = new PropertyDescriptor.Builder()
.name("ElasticSearch Path Home")
.description("ElasticSearch node client requires that path.home be set. For example, "
+ "/usr/share/elasticsearch or /usr/local/opt/elasticsearch for homebrew intall "
+ "https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-dir-layout.html")
.required(false)
.addValidator(new ElasticsearchClientValidator())
.build();
protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder()
.name("ElasticSearch Ping Timeout")
.description("The ping timeout used to determine when a node is unreachable. " +
@ -89,6 +99,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
.defaultValue("5s")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder()
.name("Sampler Interval")
.description("Node sampler interval. For example, 5s (5 seconds) If non-local recommended is 30s")
@ -144,7 +155,14 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
}
esClient = transportClient;
} else if ("node".equals(clusterType)) {
esClient = NodeBuilder.nodeBuilder().clusterName(clusterName).node().client();
final String pathHome = context.getProperty(PATH_HOME).toString();
//create new node client
Settings settings = Settings.settingsBuilder()
.put("path.home", pathHome)
.build();
esClient = NodeBuilder.nodeBuilder().clusterName(clusterName).settings(settings).node().client();
}
} catch (Exception e) {
log.error("Failed to create Elasticsearch client due to {}", new Object[]{e}, e);
@ -205,7 +223,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
}
/**
* A custom validator for the Elasticsearch properties list. For example, the hostnames property doesn't need to
* A custom validator for the ElasticSearch properties list. For example, the hostnames property doesn't need to
* be filled in for a Node client, as it joins the cluster by name. Alternatively if a Transport client
*/
protected static class ElasticsearchClientValidator implements Validator {
@ -220,6 +238,16 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
CLIENT_TYPE.getName(), clientTypeProperty.getValue(), context);
}
}
// Only validate Path home if client type == Node
if (PATH_HOME.getName().equals(subject)) {
PropertyValue clientTypeProperty = context.getProperty(CLIENT_TYPE);
if (NODE_CLIENT.getValue().equals(clientTypeProperty.getValue())) {
return StandardValidators.NON_EMPTY_VALIDATOR.validate(
CLIENT_TYPE.getName(), clientTypeProperty.getValue(), context);
}
}
return VALID.validate(subject, input, context);
}
}

View File

@ -116,6 +116,7 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
descriptors.add(CLIENT_TYPE);
descriptors.add(CLUSTER_NAME);
descriptors.add(HOSTS);
descriptors.add(PATH_HOME);
descriptors.add(PING_TIMEOUT);
descriptors.add(SAMPLER_INTERVAL);
descriptors.add(ID_ATTRIBUTE);

View File

@ -21,8 +21,6 @@ import com.google.gson.JsonParser;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockProcessorInitializationContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.elasticsearch.action.ActionListener;
@ -32,11 +30,8 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.AbstractListenableActionFuture;
import org.elasticsearch.action.support.AdapterActionFuture;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@ -125,6 +120,35 @@ public class TestPutElasticsearch {
out.assertAttributeEquals("tweet_id", "28039652140");
}
@Test
public void testPutElasticSearchOnTriggerNode() throws IOException {
runner = TestRunners.newTestRunner(new ElasticsearchTestProcessor(false)); // no failures
runner.setValidateExpressionUsage(false);
runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE,"node");
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch.INDEX, "tweet");
runner.assertNotValid();
runner.setProperty(PutElasticsearch.TYPE, "status");
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
runner.assertNotValid();
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
runner.assertValid();
runner.enqueue(twitterExample, new HashMap<String, String>() {{
put("tweet_id", "28039652141");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("tweet_id", "28039652141");
}
/**
* A Test class that extends the processor in order to inject/mock behavior
*/
@ -197,14 +221,56 @@ public class TestPutElasticsearch {
}.getClass().getEnclosingMethod().getName());
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch());
runner.setValidateExpressionUsage(false);
//Local Cluster - Mac pulled from brew
runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, "transport");
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch.INDEX, "tweet");
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
runner.setProperty(PutElasticsearch.TYPE, "status");
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
runner.assertValid();
runner.enqueue(twitterExample, new HashMap<String, String>() {{
put("tweet_id", "28039652140");
}});
runner.enqueue(twitterExample);
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
}
@Test
@Ignore("Comment this out if you want to run against local or test ES")
public void testPutElasticSearchBasicNode() throws IOException {
System.out.println("Starting test " + new Object() {
}.getClass().getEnclosingMethod().getName());
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch());
runner.setValidateExpressionUsage(false);
//Local Cluster - Mac pulled from brew
runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, "node");
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
runner.setProperty(AbstractElasticsearchProcessor.PATH_HOME, "/usr/local/opt/elasticsearch");
runner.setProperty(PutElasticsearch.INDEX, "tweet");
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
runner.setProperty(PutElasticsearch.TYPE, "status");
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
runner.assertValid();
runner.enqueue(twitterExample, new HashMap<String, String>() {{
put("tweet_id", "28039652141");
}});
runner.enqueue(twitterExample);
runner.run(1, true, true);
@ -221,7 +287,9 @@ public class TestPutElasticsearch {
}.getClass().getEnclosingMethod().getName());
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch());
runner.setValidateExpressionUsage(false);
//Local Cluster - Mac pulled from brew
runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, "transport");
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
@ -229,6 +297,11 @@ public class TestPutElasticsearch {
runner.setProperty(PutElasticsearch.INDEX, "tweet");
runner.setProperty(PutElasticsearch.BATCH_SIZE, "100");
runner.setProperty(PutElasticsearch.TYPE, "status");
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
runner.assertValid();
JsonParser parser = new JsonParser();
JsonObject json;
String message = convertStreamToString(twitterExample);
@ -237,8 +310,11 @@ public class TestPutElasticsearch {
json = parser.parse(message).getAsJsonObject();
String id = json.get("id").getAsString();
long newId = Long.parseLong(id) + i;
json.addProperty("id", newId);
runner.enqueue(message.getBytes());
final String newStrId = Long.toString(newId);
//json.addProperty("id", newId);
runner.enqueue(message.getBytes(), new HashMap<String, String>() {{
put("tweet_id", newStrId);
}});
}