NIFI-3223 added support for expression language

- EXCHANGE
- ROUTING_KEY

This closes #1449.
This commit is contained in:
Oleg Zhurakousky 2017-01-27 13:41:54 -05:00 committed by Pierre Villard
parent 839dd5ac18
commit 027fbf48b8
4 changed files with 48 additions and 75 deletions

View File

@ -19,8 +19,6 @@ package org.apache.nifi.amqp.processors;
import java.io.IOException; import java.io.IOException;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Connection;
@ -32,50 +30,21 @@ import com.rabbitmq.client.ReturnListener;
*/ */
final class AMQPPublisher extends AMQPWorker { final class AMQPPublisher extends AMQPWorker {
private final static Logger logger = LoggerFactory.getLogger(AMQPPublisher.class);
private final String exchangeName;
private final String routingKey;
private final ComponentLog processLog; private final ComponentLog processLog;
private final String connectionString;
/** /**
* Creates an instance of this publisher * Creates an instance of this publisher
* *
* @param connection * @param connection
* instance of AMQP {@link Connection} * instance of AMQP {@link Connection}
* @param exchangeName
* the name of AMQP exchange to which messages will be published.
* If not provided 'default' exchange will be used.
* @param routingKey
* (required) the name of the routingKey to be used by AMQP-based
* system to route messages to its final destination (queue).
*/ */
AMQPPublisher(Connection connection, String exchangeName, String routingKey, ComponentLog processLog) { AMQPPublisher(Connection connection, ComponentLog processLog) {
super(connection); super(connection);
this.processLog = processLog; this.processLog = processLog;
this.validateStringProperty("routingKey", routingKey);
this.exchangeName = exchangeName == null ? "" : exchangeName.trim();
if (this.exchangeName.length() == 0) {
logger.info("The 'exchangeName' is not specified. Messages will be sent to default exchange");
}
this.routingKey = routingKey;
this.channel.addReturnListener(new UndeliverableMessageLogger()); this.channel.addReturnListener(new UndeliverableMessageLogger());
logger.info("Successfully connected AMQPPublisher to " + connection.toString() + " and '" + this.exchangeName this.connectionString = connection.toString();
+ "' exchange with '" + routingKey + "' as a routing key.");
}
/**
* Publishes message without any AMQP properties (see
* {@link BasicProperties}) to a pre-defined AMQP Exchange.
*
* @param bytes
* bytes representing a message.
*/
void publish(byte[] bytes) {
this.publish(bytes, null);
} }
/** /**
@ -86,14 +55,28 @@ final class AMQPPublisher extends AMQPWorker {
* bytes representing a message. * bytes representing a message.
* @param properties * @param properties
* instance of {@link BasicProperties} * instance of {@link BasicProperties}
* @param exchange
* the name of AMQP exchange to which messages will be published.
* If not provided 'default' exchange will be used.
* @param routingKey
* (required) the name of the routingKey to be used by AMQP-based
* system to route messages to its final destination (queue).
*/ */
void publish(byte[] bytes, BasicProperties properties) { void publish(byte[] bytes, BasicProperties properties, String routingKey, String exchange) {
this.validateStringProperty("routingKey", routingKey);
exchange = exchange == null ? "" : exchange.trim();
if (exchange.length() == 0) {
processLog.info("The 'exchangeName' is not specified. Messages will be sent to default exchange");
}
processLog.info("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange
+ "' exchange with '" + routingKey + "' as a routing key.");
if (this.channel.isOpen()) { if (this.channel.isOpen()) {
try { try {
this.channel.basicPublish(this.exchangeName, this.routingKey, true, properties, bytes); this.channel.basicPublish(exchange, routingKey, true, properties, bytes);
} catch (Exception e) { } catch (Exception e) {
throw new IllegalStateException("Failed to publish to '" + throw new IllegalStateException("Failed to publish to '" +
this.exchangeName + "' with '" + this.routingKey + "'.", e); exchange + "' with '" + routingKey + "'.", e);
} }
} else { } else {
throw new IllegalStateException("This instance of AMQPPublisher is invalid since " throw new IllegalStateException("This instance of AMQPPublisher is invalid since "
@ -106,7 +89,7 @@ final class AMQPPublisher extends AMQPWorker {
*/ */
@Override @Override
public String toString() { public String toString() {
return super.toString() + ", EXCHANGE:" + this.exchangeName + ", ROUTING_KEY:" + this.routingKey; return this.connectionString;
} }
/** /**
@ -127,7 +110,7 @@ final class AMQPPublisher extends AMQPWorker {
throws IOException { throws IOException {
String logMessage = "Message destined for '" + exchangeName + "' exchange with '" + routingKey String logMessage = "Message destined for '" + exchangeName + "' exchange with '" + routingKey
+ "' as routing key came back with replyCode=" + replyCode + " and replyText=" + replyText + "."; + "' as routing key came back with replyCode=" + replyCode + " and replyText=" + replyText + ".";
logger.warn(logMessage); processLog.warn(logMessage);
AMQPPublisher.this.processLog.warn(logMessage); AMQPPublisher.this.processLog.warn(logMessage);
} }
} }

View File

@ -70,6 +70,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
+ "It is an optional property. If kept empty the messages will be sent to a default AMQP exchange.") + "It is an optional property. If kept empty the messages will be sent to a default AMQP exchange.")
.required(true) .required(true)
.defaultValue("") .defaultValue("")
.expressionLanguageSupported(true)
.addValidator(Validator.VALID) .addValidator(Validator.VALID)
.build(); .build();
public static final PropertyDescriptor ROUTING_KEY = new PropertyDescriptor.Builder() public static final PropertyDescriptor ROUTING_KEY = new PropertyDescriptor.Builder()
@ -79,6 +80,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
+ "corresponds to a destination queue name, otherwise a binding from the Exchange to a Queue via Routing Key must be set " + "corresponds to a destination queue name, otherwise a binding from the Exchange to a Queue via Routing Key must be set "
+ "(usually by the AMQP administrator)") + "(usually by the AMQP administrator)")
.required(true) .required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
@ -130,15 +132,19 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
FlowFile flowFile = processSession.get(); FlowFile flowFile = processSession.get();
if (flowFile != null) { if (flowFile != null) {
BasicProperties amqpProperties = this.extractAmqpPropertiesFromFlowFile(flowFile); BasicProperties amqpProperties = this.extractAmqpPropertiesFromFlowFile(flowFile);
String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue();
if (routingKey == null){
throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '"
+ context.getProperty(ROUTING_KEY) + "' after evaluating it as expression against incoming FlowFile.");
}
String exchange = context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue();
byte[] messageContent = this.extractMessage(flowFile, processSession); byte[] messageContent = this.extractMessage(flowFile, processSession);
try { try {
this.targetResource.publish(messageContent, amqpProperties); this.targetResource.publish(messageContent, amqpProperties, routingKey, exchange);
processSession.transfer(flowFile, REL_SUCCESS); processSession.transfer(flowFile, REL_SUCCESS);
processSession.getProvenanceReporter().send(flowFile, processSession.getProvenanceReporter().send(flowFile, this.amqpConnection.toString() + "/E:" + exchange + "/RK:" + routingKey);
this.amqpConnection.toString() + "/E:" + context.getProperty(EXCHANGE).getValue() + "/RK:"
+ context.getProperty(ROUTING_KEY).getValue());
} catch (Exception e) { } catch (Exception e) {
processSession.transfer(processSession.penalize(flowFile), REL_FAILURE); processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
this.getLogger().error("Failed while sending message to AMQP via " + this.targetResource, e); this.getLogger().error("Failed while sending message to AMQP via " + this.targetResource, e);
@ -168,9 +174,7 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
*/ */
@Override @Override
protected AMQPPublisher finishBuildingTargetResource(ProcessContext context) { protected AMQPPublisher finishBuildingTargetResource(ProcessContext context) {
String exchangeName = context.getProperty(EXCHANGE).getValue(); return new AMQPPublisher(this.amqpConnection, this.getLogger());
String routingKey = context.getProperty(ROUTING_KEY).getValue();
return new AMQPPublisher(this.amqpConnection, exchangeName, routingKey, this.getLogger());
} }
/** /**

View File

@ -17,7 +17,6 @@
package org.apache.nifi.amqp.processors; package org.apache.nifi.amqp.processors;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -27,6 +26,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.util.MockComponentLog; import org.apache.nifi.util.MockComponentLog;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -40,31 +40,24 @@ public class AMQPPublisherTest {
@SuppressWarnings("resource") @SuppressWarnings("resource")
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void failOnNullConnection() { public void failOnNullConnection() {
new AMQPPublisher(null, null, null, null); new AMQPPublisher(null, null);
}
@SuppressWarnings("resource")
@Test(expected = IllegalArgumentException.class)
public void failOnMissingRoutingKey() throws Exception {
Connection conn = new TestConnection(null, null);
new AMQPPublisher(conn, null, "", null);
} }
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
public void failPublishIfChannelClosed() throws Exception { public void failPublishIfChannelClosed() throws Exception {
Connection conn = new TestConnection(null, null); Connection conn = new TestConnection(null, null);
try (AMQPPublisher sender = new AMQPPublisher(conn, null, "foo", null)) { try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) {
conn.close(); conn.close();
sender.publish("oleg".getBytes()); sender.publish("oleg".getBytes(), null, "foo", "");
} }
} }
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
public void failPublishIfChannelFails() throws Exception { public void failPublishIfChannelFails() throws Exception {
TestConnection conn = new TestConnection(null, null); TestConnection conn = new TestConnection(null, null);
try (AMQPPublisher sender = new AMQPPublisher(conn, null, "foo", null)) { try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) {
((TestChannel) conn.createChannel()).corruptChannel(); ((TestChannel) conn.createChannel()).corruptChannel();
sender.publish("oleg".getBytes()); sender.publish("oleg".getBytes(), null, "foo", "");
} }
} }
@ -77,8 +70,8 @@ public class AMQPPublisherTest {
Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
try (AMQPPublisher sender = new AMQPPublisher(connection, "myExchange", "key1", null)) { try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
sender.publish("hello".getBytes()); sender.publish("hello".getBytes(), null, "key1", "myExchange");
Thread.sleep(200); Thread.sleep(200);
} }
@ -100,9 +93,8 @@ public class AMQPPublisherTest {
ReturnListener retListener = mock(ReturnListener.class); ReturnListener retListener = mock(ReturnListener.class);
connection.createChannel().addReturnListener(retListener); connection.createChannel().addReturnListener(retListener);
try (AMQPPublisher sender = new AMQPPublisher(connection, "myExchange", "key2", try (AMQPPublisher sender = new AMQPPublisher(connection, new MockComponentLog("foo", ""))) {
new MockComponentLog("foo", ""))) { sender.publish("hello".getBytes(), null, "key1", "myExchange");
sender.publish("hello".getBytes());
Thread.sleep(1000); Thread.sleep(1000);
} }
Thread.sleep(200); Thread.sleep(200);
@ -111,12 +103,4 @@ public class AMQPPublisherTest {
connection.close(); connection.close();
} }
@Test
public void validateToString() throws Exception {
TestConnection conn = new TestConnection(null, null);
try (AMQPPublisher sender = new AMQPPublisher(conn, "myExchange", "key1", null)) {
String toString = sender.toString();
assertTrue(toString.contains("EXCHANGE:myExchange, ROUTING_KEY:key1"));
}
}
} }

View File

@ -17,12 +17,14 @@
package org.apache.nifi.amqp.processors; package org.apache.nifi.amqp.processors;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
@ -46,8 +48,8 @@ public class ConsumeAMQPTest {
Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
try (AMQPPublisher sender = new AMQPPublisher(connection, "myExchange", "key1", null)) { try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN); sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
ConsumeAMQP pubProc = new LocalConsumeAMQP(connection); ConsumeAMQP pubProc = new LocalConsumeAMQP(connection);
TestRunner runner = TestRunners.newTestRunner(pubProc); TestRunner runner = TestRunners.newTestRunner(pubProc);