mirror of https://github.com/apache/nifi.git
NIFI-3885 DynamoDB Processor EL Support
Add EL support to remaining Dynamo processor properties Signed-off-by: James Wing <jvwing@gmail.com> This closes #1793.
This commit is contained in:
parent
196ca237e6
commit
20a1fc24d7
|
@ -86,7 +86,7 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
|
||||||
public static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder()
|
||||||
.name("Table Name")
|
.name("Table Name")
|
||||||
.required(true)
|
.required(true)
|
||||||
.expressionLanguageSupported(false)
|
.expressionLanguageSupported(true)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.description("The DynamoDB table name")
|
.description("The DynamoDB table name")
|
||||||
.build();
|
.build();
|
||||||
|
@ -127,6 +127,7 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
|
||||||
public static final PropertyDescriptor HASH_KEY_NAME = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor HASH_KEY_NAME = new PropertyDescriptor.Builder()
|
||||||
.name("Hash Key Name")
|
.name("Hash Key Name")
|
||||||
.required(true)
|
.required(true)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.description("The hash key name of the item")
|
.description("The hash key name of the item")
|
||||||
.build();
|
.build();
|
||||||
|
@ -134,6 +135,7 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
|
||||||
public static final PropertyDescriptor RANGE_KEY_NAME = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor RANGE_KEY_NAME = new PropertyDescriptor.Builder()
|
||||||
.name("Range Key Name")
|
.name("Range Key Name")
|
||||||
.required(false)
|
.required(false)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.description("The range key name of the item")
|
.description("The range key name of the item")
|
||||||
.build();
|
.build();
|
||||||
|
@ -141,6 +143,7 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
|
||||||
public static final PropertyDescriptor JSON_DOCUMENT = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor JSON_DOCUMENT = new PropertyDescriptor.Builder()
|
||||||
.name("Json Document attribute")
|
.name("Json Document attribute")
|
||||||
.required(true)
|
.required(true)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.description("The Json document to be retrieved from the dynamodb item")
|
.description("The Json document to be retrieved from the dynamodb item")
|
||||||
.build();
|
.build();
|
||||||
|
@ -148,7 +151,7 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
|
||||||
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||||
.name("Batch items for each request (between 1 and 50)")
|
.name("Batch items for each request (between 1 and 50)")
|
||||||
.required(false)
|
.required(false)
|
||||||
.expressionLanguageSupported(false)
|
.expressionLanguageSupported(true)
|
||||||
.addValidator(StandardValidators.createLongValidator(1, 50, true))
|
.addValidator(StandardValidators.createLongValidator(1, 50, true))
|
||||||
.defaultValue("1")
|
.defaultValue("1")
|
||||||
.description("The items to be retrieved in one batch")
|
.description("The items to be retrieved in one batch")
|
||||||
|
@ -159,6 +162,7 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr
|
||||||
.description("Character set of data in the document")
|
.description("Character set of data in the document")
|
||||||
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
||||||
.required(true)
|
.required(true)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
.defaultValue(Charset.defaultCharset().name())
|
.defaultValue(Charset.defaultCharset().name())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
|
|
@ -83,18 +83,18 @@ public class DeleteDynamoDB extends AbstractWriteDynamoDBProcessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||||
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger());
|
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger());
|
||||||
if (flowFiles == null || flowFiles.size() == 0) {
|
if (flowFiles == null || flowFiles.size() == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
|
Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
|
||||||
|
|
||||||
final String table = context.getProperty(TABLE).getValue();
|
final String table = context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
|
||||||
|
|
||||||
final String hashKeyName = context.getProperty(HASH_KEY_NAME).getValue();
|
final String hashKeyName = context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue();
|
||||||
final String hashKeyValueType = context.getProperty(HASH_KEY_VALUE_TYPE).getValue();
|
final String hashKeyValueType = context.getProperty(HASH_KEY_VALUE_TYPE).getValue();
|
||||||
final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).getValue();
|
final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue();
|
||||||
final String rangeKeyValueType = context.getProperty(RANGE_KEY_VALUE_TYPE).getValue();
|
final String rangeKeyValueType = context.getProperty(RANGE_KEY_VALUE_TYPE).getValue();
|
||||||
|
|
||||||
TableWriteItems tableWriteItems = new TableWriteItems(table);
|
TableWriteItems tableWriteItems = new TableWriteItems(table);
|
||||||
|
|
|
@ -100,19 +100,19 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||||
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger());
|
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger());
|
||||||
if (flowFiles == null || flowFiles.size() == 0) {
|
if (flowFiles == null || flowFiles.size() == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
|
Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
|
||||||
|
|
||||||
final String table = context.getProperty(TABLE).getValue();
|
final String table = context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
|
||||||
TableKeysAndAttributes tableKeysAndAttributes = new TableKeysAndAttributes(table);
|
TableKeysAndAttributes tableKeysAndAttributes = new TableKeysAndAttributes(table);
|
||||||
|
|
||||||
final String hashKeyName = context.getProperty(HASH_KEY_NAME).getValue();
|
final String hashKeyName = context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue();
|
||||||
final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).getValue();
|
final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue();
|
||||||
final String jsonDocument = context.getProperty(JSON_DOCUMENT).getValue();
|
final String jsonDocument = context.getProperty(JSON_DOCUMENT).evaluateAttributeExpressions().getValue();
|
||||||
|
|
||||||
for (FlowFile flowFile : flowFiles) {
|
for (FlowFile flowFile : flowFiles) {
|
||||||
final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile);
|
final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile);
|
||||||
|
|
|
@ -95,21 +95,21 @@ public class PutDynamoDB extends AbstractWriteDynamoDBProcessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||||
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).asInteger());
|
List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger());
|
||||||
if (flowFiles == null || flowFiles.size() == 0) {
|
if (flowFiles == null || flowFiles.size() == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<ItemKeys, FlowFile> keysToFlowFileMap = new HashMap<>();
|
Map<ItemKeys, FlowFile> keysToFlowFileMap = new HashMap<>();
|
||||||
|
|
||||||
final String table = context.getProperty(TABLE).getValue();
|
final String table = context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
|
||||||
|
|
||||||
final String hashKeyName = context.getProperty(HASH_KEY_NAME).getValue();
|
final String hashKeyName = context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue();
|
||||||
final String hashKeyValueType = context.getProperty(HASH_KEY_VALUE_TYPE).getValue();
|
final String hashKeyValueType = context.getProperty(HASH_KEY_VALUE_TYPE).getValue();
|
||||||
final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).getValue();
|
final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue();
|
||||||
final String rangeKeyValueType = context.getProperty(RANGE_KEY_VALUE_TYPE).getValue();
|
final String rangeKeyValueType = context.getProperty(RANGE_KEY_VALUE_TYPE).getValue();
|
||||||
final String jsonDocument = context.getProperty(JSON_DOCUMENT).getValue();
|
final String jsonDocument = context.getProperty(JSON_DOCUMENT).evaluateAttributeExpressions().getValue();
|
||||||
final String charset = context.getProperty(DOCUMENT_CHARSET).getValue();
|
final String charset = context.getProperty(DOCUMENT_CHARSET).evaluateAttributeExpressions().getValue();
|
||||||
|
|
||||||
TableWriteItems tableWriteItems = new TableWriteItems(table);
|
TableWriteItems tableWriteItems = new TableWriteItems(table);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue