NIFI-4303 Add routingKey to ConsumeAMQP processor

Add exchange to ConsumeAMQP processor
Update test
Refactoring tests

This closes #4464.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
doodzio 2020-08-05 21:20:24 +02:00 committed by Peter Turcsanyi
parent f73a019f36
commit 02ee999efb
3 changed files with 14 additions and 6 deletions

View File

@ -42,6 +42,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
@Tags({"amqp", "rabbit", "get", "message", "receive", "consume"})
@ -63,6 +64,8 @@ import com.rabbitmq.client.GetResponse;
@WritesAttribute(attribute = "amqp$type", description = "The type of message"),
@WritesAttribute(attribute = "amqp$userId", description = "The ID of the user"),
@WritesAttribute(attribute = "amqp$clusterId", description = "The ID of the AMQP Cluster"),
@WritesAttribute(attribute = "amqp$routingKey", description = "The routingKey of the AMQP Message"),
@WritesAttribute(attribute = "amqp$exchange", description = "The exchange from which AMQP Message was received")
})
public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
private static final String ATTRIBUTES_PREFIX = "amqp$";
@ -146,7 +149,8 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
flowFile = session.write(flowFile, out -> out.write(response.getBody()));
final BasicProperties amqpProperties = response.getProps();
final Map<String, String> attributes = buildAttributes(amqpProperties);
final Envelope envelope = response.getEnvelope();
final Map<String, String> attributes = buildAttributes(amqpProperties, envelope);
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile, connection.toString() + "/" + context.getProperty(QUEUE).getValue());
@ -160,7 +164,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
}
}
private Map<String, String> buildAttributes(final BasicProperties properties) {
private Map<String, String> buildAttributes(final BasicProperties properties, final Envelope envelope) {
final Map<String, String> attributes = new HashMap<>();
addAttribute(attributes, ATTRIBUTES_PREFIX + "appId", properties.getAppId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "contentEncoding", properties.getContentEncoding());
@ -176,6 +180,8 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
addAttribute(attributes, ATTRIBUTES_PREFIX + "type", properties.getType());
addAttribute(attributes, ATTRIBUTES_PREFIX + "userId", properties.getUserId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "clusterId", properties.getClusterId());
addAttribute(attributes, ATTRIBUTES_PREFIX + "routingKey", envelope.getRoutingKey());
addAttribute(attributes, ATTRIBUTES_PREFIX + "exchange", envelope.getExchange());
return attributes;
}

View File

@ -40,7 +40,7 @@
The following is the list of available standard AMQP properties which may come with the message: <i>("amqp$contentType", "amqp$contentEncoding",
"amqp$headers", "amqp$deliveryMode", "amqp$priority", "amqp$correlationId", "amqp$replyTo",
"amqp$expiration", "amqp$messageId", "amqp$timestamp", "amqp$type", "amqp$userId", "amqp$appId",
"amqp$clusterId")</i>
"amqp$clusterId", "amqp$routingKey")</i>
</p>
<h2>Configuration Details</h2>
<p>

View File

@ -44,7 +44,7 @@ public class ConsumeAMQPTest {
@Test
public void testMessageAcked() throws TimeoutException, IOException {
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1"));
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Collections.singletonList("queue1"));
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
@ -75,7 +75,7 @@ public class ConsumeAMQPTest {
@Test
public void testBatchSizeAffectsAcks() throws TimeoutException, IOException {
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1"));
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Collections.singletonList("queue1"));
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
@ -106,7 +106,7 @@ public class ConsumeAMQPTest {
@Test
public void testConsumerStopped() throws TimeoutException, IOException {
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1"));
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Collections.singletonList("queue1"));
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
@ -156,6 +156,8 @@ public class ConsumeAMQPTest {
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeEquals("amqp$routingKey", "key1");
successFF.assertAttributeEquals("amqp$exchange", "myExchange");
}
}