NIFI-6730 AMQP QoS support

This closes #8146.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Mikhail Sapozhnikov 2023-12-08 14:38:14 +03:00 committed by Peter Turcsanyi
parent 3c4ccd2c64
commit 76613a0ed4
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
5 changed files with 45 additions and 12 deletions

View File

@ -43,7 +43,8 @@ final class AMQPConsumer extends AMQPWorker {
private final boolean autoAcknowledge;
private final Consumer consumer;
AMQPConsumer(final Connection connection, final String queueName, final boolean autoAcknowledge, ComponentLog processorLog) throws IOException {
AMQPConsumer(final Connection connection, final String queueName, final boolean autoAcknowledge, final int prefetchCount,
ComponentLog processorLog) throws IOException {
super(connection, processorLog);
this.validateStringProperty("queueName", queueName);
this.queueName = queueName;
@ -80,6 +81,7 @@ final class AMQPConsumer extends AMQPWorker {
}
};
channel.basicQos(prefetchCount);
channel.basicConsume(queueName, autoAcknowledge, consumer);
}

View File

@ -112,6 +112,17 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
.defaultValue("10")
.required(true)
.build();
static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder()
.name("prefetch.count")
.displayName("Prefetch Count")
.description("The maximum number of unacknowledged messages for the consumer. If consumer has this number of unacknowledged messages, AMQP broker will "
+ "no longer send new messages until consumer acknowledges some of the messages already delivered to it."
+ "Allowed values: from 0 to 65535. 0 means no limit")
.addValidator(StandardValidators.createLongValidator(0, 65535, true))
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.defaultValue("0")
.required(true)
.build();
public static final PropertyDescriptor HEADER_FORMAT = new PropertyDescriptor.Builder()
.name("header.format")
@ -167,6 +178,7 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
properties.add(QUEUE);
properties.add(AUTO_ACKNOWLEDGE);
properties.add(BATCH_SIZE);
properties.add(PREFETCH_COUNT);
properties.add(HEADER_FORMAT);
properties.add(HEADER_KEY_PREFIX);
properties.add(HEADER_SEPARATOR);
@ -301,7 +313,8 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
try {
final String queueName = context.getProperty(QUEUE).getValue();
final boolean autoAcknowledge = context.getProperty(AUTO_ACKNOWLEDGE).asBoolean();
final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, queueName, autoAcknowledge, getLogger());
final int prefetchCount = context.getProperty(PREFETCH_COUNT).asInteger();
final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, queueName, autoAcknowledge, prefetchCount, getLogger());
return amqpConsumer;
} catch (final IOException ioe) {

View File

@ -42,6 +42,7 @@ import org.junit.jupiter.api.Test;
public class AMQPConsumerTest {
private static final int DEFAULT_PREFETCH_COUNT = 0;
private ComponentLog processorLog;
@BeforeEach
@ -55,7 +56,7 @@ public class AMQPConsumerTest {
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
final TestConnection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, processorLog);
final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, DEFAULT_PREFETCH_COUNT, processorLog);
consumer.getChannel().basicPublish("myExchange", "key1", new BasicProperties(), new byte[0]);
consumer.close();
@ -69,7 +70,7 @@ public class AMQPConsumerTest {
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
final TestConnection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, processorLog);
final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, DEFAULT_PREFETCH_COUNT, processorLog);
assertFalse(consumer.closed);
@ -80,14 +81,14 @@ public class AMQPConsumerTest {
@Test
public void failOnNullConnection() {
assertThrows(IllegalArgumentException.class, () -> new AMQPConsumer(null, null, true, processorLog));
assertThrows(IllegalArgumentException.class, () -> new AMQPConsumer(null, null, true, DEFAULT_PREFETCH_COUNT, processorLog));
}
@Test
public void failOnNullQueueName() {
assertThrows(IllegalArgumentException.class, () -> {
Connection conn = new TestConnection(null, null);
new AMQPConsumer(conn, null, true, processorLog);
new AMQPConsumer(conn, null, true, DEFAULT_PREFETCH_COUNT, processorLog);
});
}
@ -95,7 +96,7 @@ public class AMQPConsumerTest {
public void failOnEmptyQueueName() {
assertThrows(IllegalArgumentException.class, () -> {
Connection conn = new TestConnection(null, null);
new AMQPConsumer(conn, " ", true, processorLog);
new AMQPConsumer(conn, " ", true, DEFAULT_PREFETCH_COUNT, processorLog);
});
}
@ -103,7 +104,7 @@ public class AMQPConsumerTest {
public void failOnNonExistingQueue() {
assertThrows(IOException.class, () -> {
Connection conn = new TestConnection(null, null);
try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true, processorLog)) {
try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true, DEFAULT_PREFETCH_COUNT, processorLog)) {
consumer.consume();
}
});
@ -117,7 +118,7 @@ public class AMQPConsumerTest {
exchangeToRoutingKeymap.put("", "queue1");
Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap);
try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, processorLog)) {
try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, DEFAULT_PREFETCH_COUNT, processorLog)) {
GetResponse response = consumer.consume();
assertNull(response);
}
@ -132,9 +133,20 @@ public class AMQPConsumerTest {
Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap);
conn.createChannel().basicPublish("myExchange", "key1", null, "hello Joe".getBytes());
try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, processorLog)) {
try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, DEFAULT_PREFETCH_COUNT, processorLog)) {
GetResponse response = consumer.consume();
assertNotNull(response);
}
}
@Test
public void validatePrefetchSet() throws Exception {
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1"));
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap);
try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, 100, processorLog)) {
TestChannel channel = (TestChannel)consumer.getChannel();
assertEquals(100, channel.getPrefetchCount());
}
}
}

View File

@ -393,7 +393,9 @@ public class ConsumeAMQPTest {
throw new IllegalStateException("Consumer already created");
}
consumer = new AMQPConsumer(connection, context.getProperty(ConsumeAMQP.QUEUE).getValue(), context.getProperty(ConsumeAMQP.AUTO_ACKNOWLEDGE).asBoolean(), getLogger());
consumer = new AMQPConsumer(connection, context.getProperty(ConsumeAMQP.QUEUE).getValue(),
context.getProperty(ConsumeAMQP.AUTO_ACKNOWLEDGE).asBoolean(), context.getProperty(ConsumeAMQP.PREFETCH_COUNT).asInteger(),
getLogger());
return consumer;
} catch (IOException e) {
throw new ProcessException(e);

View File

@ -77,6 +77,7 @@ class TestChannel implements Channel {
private long deliveryTag = 0L;
private final BitSet acknowledgments = new BitSet();
private final BitSet nacks = new BitSet();
private int prefetchCount = 0;
public TestChannel(Map<String, String> exchangeToRoutingKeyMappings,
Map<String, List<String>> routingKeyToQueueMappings) {
@ -222,8 +223,11 @@ class TestChannel implements Channel {
@Override
public void basicQos(int prefetchCount) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
this.prefetchCount = prefetchCount;
}
public int getPrefetchCount() {
return this.prefetchCount;
}
@Override