NIFI-8198: ConsumeAMQP detects if Queue is deleted on server

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4805.
This commit is contained in:
Peter Turcsanyi 2021-02-04 22:31:00 +01:00 committed by Pierre Villard
parent f2555f27f1
commit f101a2bba5
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
3 changed files with 31 additions and 2 deletions

View File

@ -68,6 +68,16 @@ final class AMQPConsumer extends AMQPWorker {
Thread.currentThread().interrupt();
}
}
@Override
public void handleCancel(String consumerTag) throws IOException {
processorLog.error("Consumer has been cancelled by the broker, eg. due to deleted queue.");
try {
close();
} catch (Exception e) {
processorLog.error("Failed to close consumer.", e);
}
}
};
channel.basicConsume(queueName, autoAcknowledge, consumer);

View File

@ -17,8 +17,10 @@
package org.apache.nifi.amqp.processors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.IOException;
@ -60,6 +62,21 @@ public class AMQPConsumerTest {
assertEquals(0, consumer.getResponseQueueSize());
}
@Test
public void testConsumerHandlesCancelling() throws IOException {
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
final TestConnection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, processorLog);
assertFalse(consumer.closed);
consumer.getChannel().basicCancel("queue1");
assertTrue(consumer.closed);
}
@Test(expected = IllegalArgumentException.class)
public void failOnNullConnection() throws IOException {
new AMQPConsumer(null, null, true, processorLog);

View File

@ -548,8 +548,10 @@ class TestChannel implements Channel {
@Override
public void basicCancel(String consumerTag) throws IOException {
throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
// consumerMap is indexed by queue name so the passed in consumerTag parameter needs to be the name of the test queue
for (Consumer consumer: consumerMap.get(consumerTag)) {
consumer.handleCancel(consumerTag);
}
}
@Override