NIFI-5489: Add expression language support to AMQP processors HOST, VHOST and USER Fields.

This closes #2936

Signed-off-by: zenfenan <zenfenan@apache.org>
This commit is contained in:
Daniel Jimenez 2018-08-04 11:11:28 -05:00 committed by zenfenan
parent 32ee552ada
commit 3731cc8558

View File

@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.authentication.exception.ProviderCreationException; import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
@ -56,27 +57,31 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
.description("Network address of AMQP broker (e.g., localhost)") .description("Network address of AMQP broker (e.g., localhost)")
.required(true) .required(true)
.defaultValue("localhost") .defaultValue("localhost")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("Port") .name("Port")
.description("Numeric value identifying Port of AMQP broker (e.g., 5671)") .description("Numeric value identifying Port of AMQP broker (e.g., 5671)")
.required(true) .required(true)
.defaultValue("5672") .defaultValue("5672")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.PORT_VALIDATOR) .addValidator(StandardValidators.PORT_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor V_HOST = new PropertyDescriptor.Builder() public static final PropertyDescriptor V_HOST = new PropertyDescriptor.Builder()
.name("Virtual Host") .name("Virtual Host")
.description("Virtual Host name which segregates AMQP system for enhanced security.") .description("Virtual Host name which segregates AMQP system for enhanced security.")
.required(false) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor USER = new PropertyDescriptor.Builder() public static final PropertyDescriptor USER = new PropertyDescriptor.Builder()
.name("User Name") .name("User Name")
.description("User Name used for authentication and authorization.") .description("User Name used for authentication and authorization.")
.required(true) .required(true)
.defaultValue("guest") .defaultValue("guest")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password") .name("Password")
@ -145,7 +150,7 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
/** /**
* Will builds target resource ({@link AMQPPublisher} or {@link AMQPConsumer}) upon first invocation and will delegate to the * Will builds target resource ({@link AMQPPublisher} or {@link AMQPConsumer}) upon first invocation and will delegate to the
* implementation of {@link #processResource(ProcessContext, ProcessSession)} method for further processing. * implementation of {@link #processResource} method for further processing.
*/ */
@Override @Override
public final void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public final void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
@ -204,12 +209,12 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
protected Connection createConnection(ProcessContext context) { protected Connection createConnection(ProcessContext context) {
final ConnectionFactory cf = new ConnectionFactory(); final ConnectionFactory cf = new ConnectionFactory();
cf.setHost(context.getProperty(HOST).getValue()); cf.setHost(context.getProperty(HOST).evaluateAttributeExpressions().getValue());
cf.setPort(Integer.parseInt(context.getProperty(PORT).getValue())); cf.setPort(Integer.parseInt(context.getProperty(PORT).evaluateAttributeExpressions().getValue()));
cf.setUsername(context.getProperty(USER).getValue()); cf.setUsername(context.getProperty(USER).evaluateAttributeExpressions().getValue());
cf.setPassword(context.getProperty(PASSWORD).getValue()); cf.setPassword(context.getProperty(PASSWORD).getValue());
final String vHost = context.getProperty(V_HOST).getValue(); final String vHost = context.getProperty(V_HOST).evaluateAttributeExpressions().getValue();
if (vHost != null) { if (vHost != null) {
cf.setVirtualHost(vHost); cf.setVirtualHost(vHost);
} }