NIFI-1666: Fixed bug with EL evaluation in PutElasticsearch processor

This commit is contained in:
Matt Burgess 2016-03-22 14:00:25 -04:00
parent 4babd067c1
commit bbbc77707f
2 changed files with 60 additions and 5 deletions

View File

@ -146,9 +146,7 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final String index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue();
final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue(); final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue();
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
final List<FlowFile> flowFiles = session.get(batchSize); final List<FlowFile> flowFiles = session.get(batchSize);
@ -166,6 +164,9 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
} }
for (FlowFile file : flowFiles) { for (FlowFile file : flowFiles) {
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue();
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue();
final String id = file.getAttribute(id_attribute); final String id = file.getAttribute(id_attribute);
if (id == null) { if (id == null) {
logger.error("No value in identifier attribute {} for {}, transferring to failure", new Object[]{id_attribute, file}); logger.error("No value in identifier attribute {} for {}, transferring to failure", new Object[]{id_attribute, file});

View File

@ -41,6 +41,8 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException; import java.io.IOException;
@ -209,6 +211,43 @@ public class TestPutElasticsearch {
assertNotNull(out); assertNotNull(out);
} }
@Test
public void testPutElasticsearchOnTriggerWithIndexFromAttribute() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false));
runner.setValidateExpressionUsage(false);
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch");
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch.INDEX, "${i}");
runner.setProperty(PutElasticsearch.TYPE, "${type}");
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652144");
put("i", "doc");
put("type", "status");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
assertNotNull(out);
runner.clearTransferState();
// Now try an empty attribute value, should fail
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652144");
put("type", "status");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_RETRY, 1);
final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearch.REL_RETRY).get(0);
assertNotNull(out2);
}
/** /**
* A Test class that extends the processor in order to inject/mock behavior * A Test class that extends the processor in order to inject/mock behavior
*/ */
@ -226,7 +265,7 @@ public class TestPutElasticsearch {
@Override @Override
public void createElasticsearchClient(ProcessContext context) throws ProcessException { public void createElasticsearchClient(ProcessContext context) throws ProcessException {
Client mockClient = mock(Client.class); final Client mockClient = mock(Client.class);
BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(mockClient, BulkAction.INSTANCE)); BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(mockClient, BulkAction.INSTANCE));
if (exceptionToThrow != null) { if (exceptionToThrow != null) {
doThrow(exceptionToThrow).when(bulkRequestBuilder).execute(); doThrow(exceptionToThrow).when(bulkRequestBuilder).execute();
@ -235,8 +274,23 @@ public class TestPutElasticsearch {
} }
when(mockClient.prepareBulk()).thenReturn(bulkRequestBuilder); when(mockClient.prepareBulk()).thenReturn(bulkRequestBuilder);
when(mockClient.prepareIndex(anyString(), anyString(), anyString())).thenAnswer(new Answer<IndexRequestBuilder>() {
@Override
public IndexRequestBuilder answer(InvocationOnMock invocationOnMock) throws Throwable {
Object[] args = invocationOnMock.getArguments();
String arg1 = (String) args[0];
if (arg1.isEmpty()) {
throw new NoNodeAvailableException("Needs index");
}
String arg2 = (String) args[1];
if (arg2.isEmpty()) {
throw new NoNodeAvailableException("Needs doc type");
} else {
IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(mockClient, IndexAction.INSTANCE); IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(mockClient, IndexAction.INSTANCE);
when(mockClient.prepareIndex(anyString(), anyString(), anyString())).thenReturn(indexRequestBuilder); return indexRequestBuilder;
}
}
});
esClient.set(mockClient); esClient.set(mockClient);
} }