diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java index eb02a86d42..6f2fddc41d 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java @@ -19,8 +19,6 @@ package org.apache.nifi.amqp.processors; import java.io.IOException; import org.apache.nifi.logging.ComponentLog; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Connection; @@ -32,50 +30,21 @@ import com.rabbitmq.client.ReturnListener; */ final class AMQPPublisher extends AMQPWorker { - private final static Logger logger = LoggerFactory.getLogger(AMQPPublisher.class); - - private final String exchangeName; - - private final String routingKey; - private final ComponentLog processLog; + private final String connectionString; + /** * Creates an instance of this publisher * * @param connection * instance of AMQP {@link Connection} - * @param exchangeName - * the name of AMQP exchange to which messages will be published. - * If not provided 'default' exchange will be used. - * @param routingKey - * (required) the name of the routingKey to be used by AMQP-based - * system to route messages to its final destination (queue). */ - AMQPPublisher(Connection connection, String exchangeName, String routingKey, ComponentLog processLog) { + AMQPPublisher(Connection connection, ComponentLog processLog) { super(connection); this.processLog = processLog; - this.validateStringProperty("routingKey", routingKey); - this.exchangeName = exchangeName == null ? "" : exchangeName.trim(); - if (this.exchangeName.length() == 0) { - logger.info("The 'exchangeName' is not specified. Messages will be sent to default exchange"); - } - - this.routingKey = routingKey; this.channel.addReturnListener(new UndeliverableMessageLogger()); - logger.info("Successfully connected AMQPPublisher to " + connection.toString() + " and '" + this.exchangeName - + "' exchange with '" + routingKey + "' as a routing key."); - } - - /** - * Publishes message without any AMQP properties (see - * {@link BasicProperties}) to a pre-defined AMQP Exchange. - * - * @param bytes - * bytes representing a message. - */ - void publish(byte[] bytes) { - this.publish(bytes, null); + this.connectionString = connection.toString(); } /** @@ -86,14 +55,28 @@ final class AMQPPublisher extends AMQPWorker { * bytes representing a message. * @param properties * instance of {@link BasicProperties} + * @param exchange + * the name of AMQP exchange to which messages will be published. + * If not provided 'default' exchange will be used. + * @param routingKey + * (required) the name of the routingKey to be used by AMQP-based + * system to route messages to its final destination (queue). */ - void publish(byte[] bytes, BasicProperties properties) { + void publish(byte[] bytes, BasicProperties properties, String routingKey, String exchange) { + this.validateStringProperty("routingKey", routingKey); + exchange = exchange == null ? "" : exchange.trim(); + if (exchange.length() == 0) { + processLog.info("The 'exchangeName' is not specified. Messages will be sent to default exchange"); + } + processLog.info("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange + + "' exchange with '" + routingKey + "' as a routing key."); + if (this.channel.isOpen()) { try { - this.channel.basicPublish(this.exchangeName, this.routingKey, true, properties, bytes); + this.channel.basicPublish(exchange, routingKey, true, properties, bytes); } catch (Exception e) { throw new IllegalStateException("Failed to publish to '" + - this.exchangeName + "' with '" + this.routingKey + "'.", e); + exchange + "' with '" + routingKey + "'.", e); } } else { throw new IllegalStateException("This instance of AMQPPublisher is invalid since " @@ -106,7 +89,7 @@ final class AMQPPublisher extends AMQPWorker { */ @Override public String toString() { - return super.toString() + ", EXCHANGE:" + this.exchangeName + ", ROUTING_KEY:" + this.routingKey; + return this.connectionString; } /** @@ -127,7 +110,7 @@ final class AMQPPublisher extends AMQPWorker { throws IOException { String logMessage = "Message destined for '" + exchangeName + "' exchange with '" + routingKey + "' as routing key came back with replyCode=" + replyCode + " and replyText=" + replyText + "."; - logger.warn(logMessage); + processLog.warn(logMessage); AMQPPublisher.this.processLog.warn(logMessage); } } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java index 85116c2540..330346f1bf 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java @@ -70,6 +70,7 @@ public class PublishAMQP extends AbstractAMQPProcessor { + "It is an optional property. If kept empty the messages will be sent to a default AMQP exchange.") .required(true) .defaultValue("") + .expressionLanguageSupported(true) .addValidator(Validator.VALID) .build(); public static final PropertyDescriptor ROUTING_KEY = new PropertyDescriptor.Builder() @@ -79,6 +80,7 @@ public class PublishAMQP extends AbstractAMQPProcessor { + "corresponds to a destination queue name, otherwise a binding from the Exchange to a Queue via Routing Key must be set " + "(usually by the AMQP administrator)") .required(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -130,15 +132,19 @@ public class PublishAMQP extends AbstractAMQPProcessor { FlowFile flowFile = processSession.get(); if (flowFile != null) { BasicProperties amqpProperties = this.extractAmqpPropertiesFromFlowFile(flowFile); + String routingKey = context.getProperty(ROUTING_KEY).evaluateAttributeExpressions(flowFile).getValue(); + if (routingKey == null){ + throw new IllegalArgumentException("Failed to determine 'routing key' with provided value '" + + context.getProperty(ROUTING_KEY) + "' after evaluating it as expression against incoming FlowFile."); + } + String exchange = context.getProperty(EXCHANGE).evaluateAttributeExpressions(flowFile).getValue(); byte[] messageContent = this.extractMessage(flowFile, processSession); try { - this.targetResource.publish(messageContent, amqpProperties); + this.targetResource.publish(messageContent, amqpProperties, routingKey, exchange); processSession.transfer(flowFile, REL_SUCCESS); - processSession.getProvenanceReporter().send(flowFile, - this.amqpConnection.toString() + "/E:" + context.getProperty(EXCHANGE).getValue() + "/RK:" - + context.getProperty(ROUTING_KEY).getValue()); + processSession.getProvenanceReporter().send(flowFile, this.amqpConnection.toString() + "/E:" + exchange + "/RK:" + routingKey); } catch (Exception e) { processSession.transfer(processSession.penalize(flowFile), REL_FAILURE); this.getLogger().error("Failed while sending message to AMQP via " + this.targetResource, e); @@ -168,9 +174,7 @@ public class PublishAMQP extends AbstractAMQPProcessor { */ @Override protected AMQPPublisher finishBuildingTargetResource(ProcessContext context) { - String exchangeName = context.getProperty(EXCHANGE).getValue(); - String routingKey = context.getProperty(ROUTING_KEY).getValue(); - return new AMQPPublisher(this.amqpConnection, exchangeName, routingKey, this.getLogger()); + return new AMQPPublisher(this.amqpConnection, this.getLogger()); } /** diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java index c8d30f78e7..33844c308c 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java @@ -17,7 +17,6 @@ package org.apache.nifi.amqp.processors; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -27,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.util.MockComponentLog; import org.junit.Test; import org.mockito.Mockito; @@ -40,31 +40,24 @@ public class AMQPPublisherTest { @SuppressWarnings("resource") @Test(expected = IllegalArgumentException.class) public void failOnNullConnection() { - new AMQPPublisher(null, null, null, null); - } - - @SuppressWarnings("resource") - @Test(expected = IllegalArgumentException.class) - public void failOnMissingRoutingKey() throws Exception { - Connection conn = new TestConnection(null, null); - new AMQPPublisher(conn, null, "", null); + new AMQPPublisher(null, null); } @Test(expected = IllegalStateException.class) public void failPublishIfChannelClosed() throws Exception { Connection conn = new TestConnection(null, null); - try (AMQPPublisher sender = new AMQPPublisher(conn, null, "foo", null)) { + try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) { conn.close(); - sender.publish("oleg".getBytes()); + sender.publish("oleg".getBytes(), null, "foo", ""); } } @Test(expected = IllegalStateException.class) public void failPublishIfChannelFails() throws Exception { TestConnection conn = new TestConnection(null, null); - try (AMQPPublisher sender = new AMQPPublisher(conn, null, "foo", null)) { + try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) { ((TestChannel) conn.createChannel()).corruptChannel(); - sender.publish("oleg".getBytes()); + sender.publish("oleg".getBytes(), null, "foo", ""); } } @@ -77,8 +70,8 @@ public class AMQPPublisherTest { Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, "myExchange", "key1", null)) { - sender.publish("hello".getBytes()); + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), null, "key1", "myExchange"); Thread.sleep(200); } @@ -100,9 +93,8 @@ public class AMQPPublisherTest { ReturnListener retListener = mock(ReturnListener.class); connection.createChannel().addReturnListener(retListener); - try (AMQPPublisher sender = new AMQPPublisher(connection, "myExchange", "key2", - new MockComponentLog("foo", ""))) { - sender.publish("hello".getBytes()); + try (AMQPPublisher sender = new AMQPPublisher(connection, new MockComponentLog("foo", ""))) { + sender.publish("hello".getBytes(), null, "key1", "myExchange"); Thread.sleep(1000); } Thread.sleep(200); @@ -111,12 +103,4 @@ public class AMQPPublisherTest { connection.close(); } - @Test - public void validateToString() throws Exception { - TestConnection conn = new TestConnection(null, null); - try (AMQPPublisher sender = new AMQPPublisher(conn, "myExchange", "key1", null)) { - String toString = sender.toString(); - assertTrue(toString.contains("EXCHANGE:myExchange, ROUTING_KEY:key1")); - } - } } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java index 3a2754db34..52b48d87b4 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java @@ -17,12 +17,14 @@ package org.apache.nifi.amqp.processors; import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; @@ -46,8 +48,8 @@ public class ConsumeAMQPTest { Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPPublisher sender = new AMQPPublisher(connection, "myExchange", "key1", null)) { - sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN); + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); ConsumeAMQP pubProc = new LocalConsumeAMQP(connection); TestRunner runner = TestRunners.newTestRunner(pubProc);