mirror of https://github.com/apache/nifi.git
NIFI-4508: This closes #2784. Update ConsumeAMQP to use basicConsume API instead of basicGet in order to provide better performance
NIFI-4508: Added some additional unit tests to verify behavior Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
parent
18a4819d51
commit
e3b0949b6b
|
@ -17,13 +17,20 @@
|
|||
package org.apache.nifi.amqp.processors;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.rabbitmq.client.AMQP.BasicProperties;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.Consumer;
|
||||
import com.rabbitmq.client.DefaultConsumer;
|
||||
import com.rabbitmq.client.Envelope;
|
||||
import com.rabbitmq.client.GetResponse;
|
||||
|
||||
/**
|
||||
|
@ -34,12 +41,45 @@ final class AMQPConsumer extends AMQPWorker {
|
|||
|
||||
private final static Logger logger = LoggerFactory.getLogger(AMQPConsumer.class);
|
||||
private final String queueName;
|
||||
private final BlockingQueue<GetResponse> responseQueue;
|
||||
private final boolean autoAcknowledge;
|
||||
private final Consumer consumer;
|
||||
|
||||
AMQPConsumer(Connection connection, String queueName) {
|
||||
private volatile boolean closed = false;
|
||||
|
||||
|
||||
AMQPConsumer(final Connection connection, final String queueName, final boolean autoAcknowledge) throws IOException {
|
||||
super(connection);
|
||||
this.validateStringProperty("queueName", queueName);
|
||||
this.queueName = queueName;
|
||||
this.autoAcknowledge = autoAcknowledge;
|
||||
this.responseQueue = new LinkedBlockingQueue<>(10);
|
||||
|
||||
logger.info("Successfully connected AMQPConsumer to " + connection.toString() + " and '" + queueName + "' queue");
|
||||
|
||||
final Channel channel = getChannel();
|
||||
consumer = new DefaultConsumer(channel) {
|
||||
@Override
|
||||
public void handleDelivery(final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) throws IOException {
|
||||
if (!autoAcknowledge && closed) {
|
||||
channel.basicReject(envelope.getDeliveryTag(), true);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
responseQueue.put(new GetResponse(envelope, properties, body, Integer.MAX_VALUE));
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
channel.basicConsume(queueName, autoAcknowledge, consumer);
|
||||
}
|
||||
|
||||
// Visible for unit tests
|
||||
protected Consumer getConsumer() {
|
||||
return consumer;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -53,12 +93,29 @@ final class AMQPConsumer extends AMQPWorker {
|
|||
* @return instance of {@link GetResponse}
|
||||
*/
|
||||
public GetResponse consume() {
|
||||
try {
|
||||
return getChannel().basicGet(this.queueName, true);
|
||||
} catch (IOException e) {
|
||||
logger.error("Failed to receive message from AMQP; " + this + ". Possible reasons: Queue '" + this.queueName
|
||||
+ "' may not have been defined", e);
|
||||
throw new ProcessException(e);
|
||||
return responseQueue.poll();
|
||||
}
|
||||
|
||||
public void acknowledge(final GetResponse response) throws IOException {
|
||||
if (autoAcknowledge) {
|
||||
return;
|
||||
}
|
||||
|
||||
getChannel().basicAck(response.getEnvelope().getDeliveryTag(), true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws TimeoutException, IOException {
|
||||
closed = true;
|
||||
|
||||
GetResponse lastMessage = null;
|
||||
GetResponse response;
|
||||
while ((response = responseQueue.poll()) != null) {
|
||||
lastMessage = response;
|
||||
}
|
||||
|
||||
if (lastMessage != null) {
|
||||
getChannel().basicNack(lastMessage.getEnvelope().getDeliveryTag(), true, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@ abstract class AMQPWorker implements AutoCloseable {
|
|||
|
||||
private final static Logger logger = LoggerFactory.getLogger(AMQPWorker.class);
|
||||
private final Channel channel;
|
||||
private boolean closed = false;
|
||||
|
||||
/**
|
||||
* Creates an instance of this worker initializing it with AMQP
|
||||
|
@ -58,20 +59,21 @@ abstract class AMQPWorker implements AutoCloseable {
|
|||
return channel;
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes {@link Channel} created when instance of this class was created.
|
||||
*/
|
||||
|
||||
@Override
|
||||
public void close() throws TimeoutException, IOException {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Closing AMQP channel for " + this.channel.getConnection().toString());
|
||||
}
|
||||
|
||||
this.channel.close();
|
||||
closed = true;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.getClass().getSimpleName() + ":" + this.channel.getConnection().toString();
|
||||
|
@ -80,10 +82,8 @@ abstract class AMQPWorker implements AutoCloseable {
|
|||
/**
|
||||
* Validates that a String property has value (not null nor empty)
|
||||
*
|
||||
* @param propertyName
|
||||
* the name of the property
|
||||
* @param value
|
||||
* the value of the property
|
||||
* @param propertyName the name of the property
|
||||
* @param value the value of the property
|
||||
*/
|
||||
void validateStringProperty(String propertyName, String value) {
|
||||
if (value == null || value.trim().length() == 0) {
|
||||
|
|
|
@ -142,6 +142,7 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
|
|||
|
||||
private final BlockingQueue<AMQPResource<T>> resourceQueue = new LinkedBlockingQueue<>();
|
||||
|
||||
|
||||
/**
|
||||
* Will builds target resource ({@link AMQPPublisher} or {@link AMQPConsumer}) upon first invocation and will delegate to the
|
||||
* implementation of {@link #processResource(ProcessContext, ProcessSession)} method for further processing.
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.nifi.amqp.processors;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -31,17 +32,19 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
|
|||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import com.rabbitmq.client.AMQP.BasicProperties;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.GetResponse;
|
||||
|
||||
@Tags({ "amqp", "rabbit", "get", "message", "receive", "consume" })
|
||||
@Tags({"amqp", "rabbit", "get", "message", "receive", "consume"})
|
||||
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
||||
@CapabilityDescription("Consumes AMQP Messages from an AMQP Broker using the AMQP 0.9.1 protocol. Each message that is received from the AMQP Broker will be "
|
||||
+ "emitted as its own FlowFile to the 'success' relationship.")
|
||||
|
@ -65,16 +68,36 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
|
|||
private static final String ATTRIBUTES_PREFIX = "amqp$";
|
||||
|
||||
public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
|
||||
.name("Queue")
|
||||
.description("The name of the existing AMQP Queue from which messages will be consumed. Usually pre-defined by AMQP administrator. ")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
.name("Queue")
|
||||
.description("The name of the existing AMQP Queue from which messages will be consumed. Usually pre-defined by AMQP administrator. ")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
static final PropertyDescriptor AUTO_ACKNOWLEDGE = new PropertyDescriptor.Builder()
|
||||
.name("auto.acknowledge")
|
||||
.displayName("Auto-Acknowledge messages")
|
||||
.description("If true, messages that are received will be auto-acknowledged by the AMQP Broker. "
|
||||
+ "This generally will provide better throughput but could result in messages being lost upon restart of NiFi")
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("false")
|
||||
.required(true)
|
||||
.build();
|
||||
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("batch.size")
|
||||
.displayName("Batch Size")
|
||||
.description("The maximum number of messages that should be pulled in a single session. Once this many messages have been received (or once no more messages are readily available), "
|
||||
+ "the messages received will be transferred to the 'success' relationship and the messages will be acknowledged with the AMQP Broker. Setting this value to a larger number "
|
||||
+ "could result in better performance, particularly for very small messages, but can also result in more messages being duplicated upon sudden restart of NiFi.")
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.defaultValue("10")
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("All FlowFiles that are received from the AMQP queue are routed to this relationship")
|
||||
.build();
|
||||
.name("success")
|
||||
.description("All FlowFiles that are received from the AMQP queue are routed to this relationship")
|
||||
.build();
|
||||
|
||||
private static final List<PropertyDescriptor> propertyDescriptors;
|
||||
private static final Set<Relationship> relationships;
|
||||
|
@ -82,6 +105,8 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
|
|||
static {
|
||||
List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(QUEUE);
|
||||
properties.add(AUTO_ACKNOWLEDGE);
|
||||
properties.add(BATCH_SIZE);
|
||||
properties.addAll(getCommonPropertyDescriptors());
|
||||
propertyDescriptors = Collections.unmodifiableList(properties);
|
||||
|
||||
|
@ -97,21 +122,40 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
|
|||
*/
|
||||
@Override
|
||||
protected void processResource(final Connection connection, final AMQPConsumer consumer, final ProcessContext context, final ProcessSession session) {
|
||||
final GetResponse response = consumer.consume();
|
||||
if (response == null) {
|
||||
context.yield();
|
||||
return;
|
||||
GetResponse lastReceived = null;
|
||||
|
||||
for (int i = 0; i < context.getProperty(BATCH_SIZE).asInteger(); i++) {
|
||||
final GetResponse response = consumer.consume();
|
||||
if (response == null) {
|
||||
if (lastReceived == null) {
|
||||
// If no messages received, then yield.
|
||||
context.yield();
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
FlowFile flowFile = session.create();
|
||||
flowFile = session.write(flowFile, out -> out.write(response.getBody()));
|
||||
|
||||
final BasicProperties amqpProperties = response.getProps();
|
||||
final Map<String, String> attributes = buildAttributes(amqpProperties);
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
|
||||
session.getProvenanceReporter().receive(flowFile, connection.toString() + "/" + context.getProperty(QUEUE).getValue());
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
lastReceived = response;
|
||||
}
|
||||
|
||||
FlowFile flowFile = session.create();
|
||||
flowFile = session.write(flowFile, out -> out.write(response.getBody()));
|
||||
session.commit();
|
||||
|
||||
final BasicProperties amqpProperties = response.getProps();
|
||||
final Map<String, String> attributes = buildAttributes(amqpProperties);
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
|
||||
session.getProvenanceReporter().receive(flowFile, connection.toString() + "/" + context.getProperty(QUEUE).getValue());
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
if (lastReceived != null) {
|
||||
try {
|
||||
consumer.acknowledge(lastReceived);
|
||||
} catch (IOException e) {
|
||||
throw new ProcessException("Failed to acknowledge message", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, String> buildAttributes(final BasicProperties properties) {
|
||||
|
@ -142,9 +186,16 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected AMQPConsumer createAMQPWorker(final ProcessContext context, final Connection connection) {
|
||||
final String queueName = context.getProperty(QUEUE).getValue();
|
||||
return new AMQPConsumer(connection, queueName);
|
||||
protected synchronized AMQPConsumer createAMQPWorker(final ProcessContext context, final Connection connection) {
|
||||
try {
|
||||
final String queueName = context.getProperty(QUEUE).getValue();
|
||||
final boolean autoAcknowledge = context.getProperty(AUTO_ACKNOWLEDGE).asBoolean();
|
||||
final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, queueName, autoAcknowledge);
|
||||
|
||||
return amqpConsumer;
|
||||
} catch (final IOException ioe) {
|
||||
throw new ProcessException("Failed to connect to AMQP Broker", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,44 +18,59 @@ package org.apache.nifi.amqp.processors;
|
|||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.rabbitmq.client.AMQP.BasicProperties;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.GetResponse;
|
||||
|
||||
public class AMQPConsumerTest {
|
||||
|
||||
@SuppressWarnings("resource")
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void failOnNullConnection() {
|
||||
new AMQPConsumer(null, null);
|
||||
|
||||
@Test
|
||||
public void testUnconsumedMessagesNacked() throws TimeoutException, IOException {
|
||||
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
|
||||
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
|
||||
|
||||
final TestConnection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
|
||||
final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true);
|
||||
consumer.getChannel().basicPublish("myExchange", "key1", new BasicProperties(), new byte[0]);
|
||||
|
||||
consumer.close();
|
||||
assertTrue(((TestChannel) consumer.getChannel()).isNack(0));
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void failOnNullConnection() throws IOException {
|
||||
new AMQPConsumer(null, null, true);
|
||||
}
|
||||
|
||||
@SuppressWarnings("resource")
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void failOnNullQueueName() throws Exception {
|
||||
Connection conn = new TestConnection(null, null);
|
||||
new AMQPConsumer(conn, null);
|
||||
new AMQPConsumer(conn, null, true);
|
||||
}
|
||||
|
||||
@SuppressWarnings("resource")
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void failOnEmptyQueueName() throws Exception {
|
||||
Connection conn = new TestConnection(null, null);
|
||||
new AMQPConsumer(conn, " ");
|
||||
new AMQPConsumer(conn, " ", true);
|
||||
}
|
||||
|
||||
@Test(expected = ProcessException.class)
|
||||
@Test(expected = IOException.class)
|
||||
public void failOnNonExistingQueue() throws Exception {
|
||||
Connection conn = new TestConnection(null, null);
|
||||
try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello")) {
|
||||
try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true)) {
|
||||
consumer.consume();
|
||||
}
|
||||
}
|
||||
|
@ -68,7 +83,7 @@ public class AMQPConsumerTest {
|
|||
exchangeToRoutingKeymap.put("", "queue1");
|
||||
|
||||
Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap);
|
||||
try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1")) {
|
||||
try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true)) {
|
||||
GetResponse response = consumer.consume();
|
||||
assertNull(response);
|
||||
}
|
||||
|
@ -83,7 +98,7 @@ public class AMQPConsumerTest {
|
|||
|
||||
Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap);
|
||||
conn.createChannel().basicPublish("myExchange", "key1", null, "hello Joe".getBytes());
|
||||
try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1")) {
|
||||
try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true)) {
|
||||
GetResponse response = consumer.consume();
|
||||
assertNotNull(response);
|
||||
}
|
||||
|
|
|
@ -16,26 +16,138 @@
|
|||
*/
|
||||
package org.apache.nifi.amqp.processors;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.rabbitmq.client.AMQP.BasicProperties;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.Envelope;
|
||||
import com.rabbitmq.client.MessageProperties;
|
||||
|
||||
public class ConsumeAMQPTest {
|
||||
|
||||
@Test
|
||||
public void testMessageAcked() throws TimeoutException, IOException {
|
||||
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1"));
|
||||
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
|
||||
|
||||
final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
|
||||
|
||||
try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
|
||||
sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
|
||||
sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
|
||||
|
||||
ConsumeAMQP proc = new LocalConsumeAMQP(connection);
|
||||
TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
runner.setProperty(ConsumeAMQP.HOST, "injvm");
|
||||
runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
|
||||
runner.setProperty(ConsumeAMQP.AUTO_ACKNOWLEDGE, "false");
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 2);
|
||||
|
||||
final MockFlowFile helloFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
|
||||
helloFF.assertContentEquals("hello");
|
||||
|
||||
final MockFlowFile worldFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(1);
|
||||
worldFF.assertContentEquals("world");
|
||||
|
||||
// A single cumulative ack should be used
|
||||
assertFalse(((TestChannel) connection.createChannel()).isAck(0));
|
||||
assertTrue(((TestChannel) connection.createChannel()).isAck(1));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchSizeAffectsAcks() throws TimeoutException, IOException {
|
||||
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1"));
|
||||
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
|
||||
|
||||
final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
|
||||
|
||||
try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
|
||||
sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
|
||||
sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
|
||||
|
||||
ConsumeAMQP proc = new LocalConsumeAMQP(connection);
|
||||
TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
runner.setProperty(ConsumeAMQP.HOST, "injvm");
|
||||
runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
|
||||
runner.setProperty(ConsumeAMQP.BATCH_SIZE, "1");
|
||||
|
||||
runner.run(2);
|
||||
|
||||
runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 2);
|
||||
|
||||
final MockFlowFile helloFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
|
||||
helloFF.assertContentEquals("hello");
|
||||
|
||||
final MockFlowFile worldFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(1);
|
||||
worldFF.assertContentEquals("world");
|
||||
|
||||
// A single cumulative ack should be used
|
||||
assertTrue(((TestChannel) connection.createChannel()).isAck(0));
|
||||
assertTrue(((TestChannel) connection.createChannel()).isAck(1));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMessagesRejectedOnStop() throws TimeoutException, IOException {
|
||||
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1"));
|
||||
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
|
||||
|
||||
final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
|
||||
|
||||
try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
|
||||
sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
|
||||
sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
|
||||
sender.publish("good-bye".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
|
||||
|
||||
LocalConsumeAMQP proc = new LocalConsumeAMQP(connection);
|
||||
TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
runner.setProperty(ConsumeAMQP.HOST, "injvm");
|
||||
runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
|
||||
runner.setProperty(ConsumeAMQP.BATCH_SIZE, "1");
|
||||
|
||||
runner.run();
|
||||
proc.close();
|
||||
|
||||
runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 1);
|
||||
|
||||
final MockFlowFile helloFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
|
||||
helloFF.assertContentEquals("hello");
|
||||
|
||||
|
||||
// A single cumulative ack should be used
|
||||
assertTrue(((TestChannel) connection.createChannel()).isAck(0));
|
||||
|
||||
// Messages 1 and 2 will have been delivered but on stop should be rejected. They will be rejected
|
||||
// cumulatively, though, so only delivery Tag 2 will be nack'ed explicitly
|
||||
assertTrue(((TestChannel) connection.createChannel()).isNack(2));
|
||||
|
||||
// Any newly delivered messages should also be immediately nack'ed.
|
||||
proc.getAMQPWorker().getConsumer().handleDelivery("123", new Envelope(3, false, "myExchange", "key1"), new BasicProperties(), new byte[0]);
|
||||
assertTrue(((TestChannel) connection.createChannel()).isNack(3));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void validateSuccessfullConsumeAndTransferToSuccess() throws Exception {
|
||||
|
@ -47,8 +159,8 @@ public class ConsumeAMQPTest {
|
|||
try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) {
|
||||
sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
|
||||
|
||||
ConsumeAMQP pubProc = new LocalConsumeAMQP(connection);
|
||||
TestRunner runner = TestRunners.newTestRunner(pubProc);
|
||||
ConsumeAMQP proc = new LocalConsumeAMQP(connection);
|
||||
TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
runner.setProperty(ConsumeAMQP.HOST, "injvm");
|
||||
runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
|
||||
|
||||
|
@ -56,11 +168,11 @@ public class ConsumeAMQPTest {
|
|||
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
|
||||
assertNotNull(successFF);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class LocalConsumeAMQP extends ConsumeAMQP {
|
||||
private final Connection connection;
|
||||
private AMQPConsumer consumer;
|
||||
|
||||
public LocalConsumeAMQP(Connection connection) {
|
||||
this.connection = connection;
|
||||
|
@ -68,7 +180,20 @@ public class ConsumeAMQPTest {
|
|||
|
||||
@Override
|
||||
protected AMQPConsumer createAMQPWorker(ProcessContext context, Connection connection) {
|
||||
return new AMQPConsumer(connection, context.getProperty(QUEUE).getValue());
|
||||
try {
|
||||
if (consumer != null) {
|
||||
throw new IllegalStateException("Consumer already created");
|
||||
}
|
||||
|
||||
consumer = new AMQPConsumer(connection, context.getProperty(QUEUE).getValue(), context.getProperty(AUTO_ACKNOWLEDGE).asBoolean());
|
||||
return consumer;
|
||||
} catch (IOException e) {
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public AMQPConsumer getAMQPWorker() {
|
||||
return consumer;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,9 +18,11 @@ package org.apache.nifi.amqp.processors;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.BitSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
@ -48,6 +50,7 @@ import com.rabbitmq.client.Connection;
|
|||
import com.rabbitmq.client.Consumer;
|
||||
import com.rabbitmq.client.ConsumerShutdownSignalCallback;
|
||||
import com.rabbitmq.client.DeliverCallback;
|
||||
import com.rabbitmq.client.Envelope;
|
||||
import com.rabbitmq.client.GetResponse;
|
||||
import com.rabbitmq.client.Method;
|
||||
import com.rabbitmq.client.ReturnCallback;
|
||||
|
@ -61,6 +64,8 @@ import com.rabbitmq.client.ShutdownSignalException;
|
|||
class TestChannel implements Channel {
|
||||
|
||||
private final ExecutorService executorService;
|
||||
private final Map<String, List<Consumer>> consumerMap = new HashMap<>();
|
||||
|
||||
private final Map<String, BlockingQueue<GetResponse>> enqueuedMessages;
|
||||
private final Map<String, List<String>> routingKeyToQueueMappings;
|
||||
private final Map<String, String> exchangeToRoutingKeyMappings;
|
||||
|
@ -68,6 +73,9 @@ class TestChannel implements Channel {
|
|||
private boolean open;
|
||||
private boolean corrupted;
|
||||
private Connection connection;
|
||||
private long deliveryTag = 0L;
|
||||
private final BitSet acknowledgments = new BitSet();
|
||||
private final BitSet nacks = new BitSet();
|
||||
|
||||
public TestChannel(Map<String, String> exchangeToRoutingKeyMappings,
|
||||
Map<String, List<String>> routingKeyToQueueMappings) {
|
||||
|
@ -232,7 +240,9 @@ class TestChannel implements Channel {
|
|||
|
||||
if (exchange.equals("")){ // default exchange; routingKey corresponds to a queue.
|
||||
BlockingQueue<GetResponse> messages = this.getMessageQueue(routingKey);
|
||||
GetResponse response = new GetResponse(null, props, body, messages.size());
|
||||
final Envelope envelope = new Envelope(deliveryTag++, false, exchange, routingKey);
|
||||
|
||||
GetResponse response = new GetResponse(envelope, props, body, messages.size());
|
||||
messages.offer(response);
|
||||
} else {
|
||||
String rKey = this.exchangeToRoutingKeyMappings.get(exchange);
|
||||
|
@ -244,8 +254,16 @@ class TestChannel implements Channel {
|
|||
} else {
|
||||
for (String queueName : queueNames) {
|
||||
BlockingQueue<GetResponse> messages = this.getMessageQueue(queueName);
|
||||
GetResponse response = new GetResponse(null, props, body, messages.size());
|
||||
final Envelope envelope = new Envelope(deliveryTag++, false, exchange, routingKey);
|
||||
GetResponse response = new GetResponse(envelope, props, body, messages.size());
|
||||
messages.offer(response);
|
||||
|
||||
final List<Consumer> consumers = consumerMap.get(queueName);
|
||||
if (consumers != null) {
|
||||
for (final Consumer consumer : consumers) {
|
||||
consumer.handleDelivery("consumerTag", envelope, props, body);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -461,20 +479,25 @@ class TestChannel implements Channel {
|
|||
|
||||
@Override
|
||||
public void basicAck(long deliveryTag, boolean multiple) throws IOException {
|
||||
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
|
||||
acknowledgments.set((int) deliveryTag);
|
||||
}
|
||||
|
||||
public boolean isAck(final int deliveryTag) {
|
||||
return acknowledgments.get(deliveryTag);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
|
||||
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
|
||||
nacks.set((int) deliveryTag);
|
||||
}
|
||||
|
||||
public boolean isNack(final int deliveryTag) {
|
||||
return nacks.get(deliveryTag);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void basicReject(long deliveryTag, boolean requeue) throws IOException {
|
||||
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
|
||||
|
||||
nacks.set((int) deliveryTag);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -484,7 +507,21 @@ class TestChannel implements Channel {
|
|||
|
||||
@Override
|
||||
public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
|
||||
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
|
||||
final BlockingQueue<GetResponse> messageQueue = enqueuedMessages.get(queue);
|
||||
if (messageQueue == null) {
|
||||
throw new IOException("Queue is not defined");
|
||||
}
|
||||
|
||||
consumerMap.computeIfAbsent(queue, q -> new ArrayList<>()).add(callback);
|
||||
|
||||
final String consumerTag = UUID.randomUUID().toString();
|
||||
|
||||
GetResponse message;
|
||||
while ((message = messageQueue.poll()) != null) {
|
||||
callback.handleDelivery(consumerTag, message.getEnvelope(), message.getProps(), message.getBody());
|
||||
}
|
||||
|
||||
return consumerTag;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue