NIFI-1594: Add support to expression language to define index operation

The index operation should be one of "index", "update", " upsert ".

This lets the operation could be defined by incoming flowfile.

Signed-off-by: João Henrique Ferreira de Freitas <joaohf@gmail.com>
Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #255
This commit is contained in:
João Henrique Ferreira de Freitas 2016-02-24 14:44:34 -03:00 committed by Matt Burgess
parent fbd299e885
commit e27c2556db
2 changed files with 56 additions and 2 deletions

View File

@ -100,6 +100,16 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
AttributeExpression.ResultType.STRING, true))
.build();
public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
.name("Index Operation")
.description("The type of the operation used to index (index, update, upsert)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
AttributeExpression.ResultType.STRING, true))
.defaultValue("index")
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The preferred number of FlowFiles to put to the database in a single transaction")
@ -134,6 +144,7 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
descriptors.add(TYPE);
descriptors.add(CHARSET);
descriptors.add(BATCH_SIZE);
descriptors.add(INDEX_OP);
return Collections.unmodifiableList(descriptors);
}
@ -166,6 +177,7 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
for (FlowFile file : flowFiles) {
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue();
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue();
final String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(file).getValue();
final String id = file.getAttribute(id_attribute);
if (id == null) {
@ -178,8 +190,20 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
public void process(final InputStream in) throws IOException {
String json = IOUtils.toString(in, charset)
.replace("\r\n", " ").replace('\n', ' ').replace('\r', ' ');
if (indexOp.equalsIgnoreCase("index")) {
bulk.add(esClient.get().prepareIndex(index, docType, id)
.setSource(json.getBytes(charset)));
} else if (indexOp.equalsIgnoreCase("upsert")) {
bulk.add(esClient.get().prepareUpdate(index, docType, id)
.setDoc(json.getBytes(charset))
.setDocAsUpsert(true));
} else if (indexOp.equalsIgnoreCase("update")) {
bulk.add(esClient.get().prepareUpdate(index, docType, id)
.setDoc(json.getBytes(charset)));
} else {
throw new IOException("Index operation: " + indexOp + " not supported.");
}
}
});
}

View File

@ -248,6 +248,36 @@ public class TestPutElasticsearch {
assertNotNull(out2);
}
@Test
public void testPutElasticSearchOnTriggerWithInvalidIndexOp() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch");
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(PutElasticsearch.TYPE, "status");
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
runner.assertNotValid();
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
runner.assertValid();
runner.setProperty(PutElasticsearch.INDEX_OP, "index_fail");
runner.assertValid();
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0);
assertNotNull(out);
}
/**
* A Test class that extends the processor in order to inject/mock behavior
*/