From 7b4cce9e219b86e6c6ade357d57a5f9b87c5c70e Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi Date: Tue, 14 Jul 2020 22:13:16 +0200 Subject: [PATCH] NIFI-6312: Improved connection handling in AMQP processors Disable connection automatic recovery which can lead to uncontrolled/stale threads. Handle the recovery in the processors instead. Use poisoning in case of errors, then discarding and recreating the poisoned consumer/publisher. NIFI-6312: Use conventional exception handling instead of poisoning Use component logger in workers. Remove basicNack()/basicReject() calls as they are not needed because all unacknowledged messages will be redelivered. NIFI-6312: Further improve exception handling and error logging. NIFI-6312: Fix consumer closing in previous commit NIFI-6312: Use custom executor with a single thread (no more is used by the processor) Reviewed by tamas palfy and simon bence Signed-off-by: Joe Witt --- .../nifi/amqp/processors/AMQPConsumer.java | 52 ++++++----- .../nifi/amqp/processors/AMQPException.java | 32 +++++++ .../nifi/amqp/processors/AMQPPublisher.java | 40 ++++---- .../nifi/amqp/processors/AMQPResource.java | 29 +++++- .../processors/AMQPRollbackException.java | 32 +++++++ .../nifi/amqp/processors/AMQPWorker.java | 25 ++--- .../processors/AbstractAMQPProcessor.java | 91 ++++++++++++++----- .../nifi/amqp/processors/ConsumeAMQP.java | 35 +++---- .../nifi/amqp/processors/PublishAMQP.java | 12 ++- .../amqp/processors/AMQPConsumerTest.java | 30 ++++-- .../amqp/processors/AMQPPublisherTest.java | 4 +- .../nifi/amqp/processors/ConsumeAMQPTest.java | 19 ++-- .../nifi/amqp/processors/PublishAMQPTest.java | 3 +- .../nifi/amqp/processors/TestChannel.java | 4 + 14 files changed, 278 insertions(+), 130 deletions(-) create mode 100644 nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPException.java create mode 100644 nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPRollbackException.java diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java index 8872e0cd50..d2c47dc387 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java @@ -21,9 +21,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeoutException; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; @@ -39,30 +38,27 @@ import com.rabbitmq.client.GetResponse; */ final class AMQPConsumer extends AMQPWorker { - private final static Logger logger = LoggerFactory.getLogger(AMQPConsumer.class); private final String queueName; private final BlockingQueue responseQueue; private final boolean autoAcknowledge; private final Consumer consumer; - private volatile boolean closed = false; - - - AMQPConsumer(final Connection connection, final String queueName, final boolean autoAcknowledge) throws IOException { - super(connection); + AMQPConsumer(final Connection connection, final String queueName, final boolean autoAcknowledge, ComponentLog processorLog) throws IOException { + super(connection, processorLog); this.validateStringProperty("queueName", queueName); this.queueName = queueName; this.autoAcknowledge = autoAcknowledge; this.responseQueue = new LinkedBlockingQueue<>(10); - logger.info("Successfully connected AMQPConsumer to " + connection.toString() + " and '" + queueName + "' queue"); + processorLog.info("Successfully connected AMQPConsumer to " + connection.toString() + " and '" + queueName + "' queue"); final Channel channel = getChannel(); consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) throws IOException { - if (!autoAcknowledge && closed) { - channel.basicReject(envelope.getDeliveryTag(), true); + if (closed) { + // simply discard the messages, all unacknowledged messages will be redelivered by the broker when the consumer connects again + processorLog.info("Consumer is closed, discarding message (delivery tag: {}).", new Object[]{envelope.getDeliveryTag()}); return; } @@ -78,8 +74,8 @@ final class AMQPConsumer extends AMQPWorker { } // Visible for unit tests - protected Consumer getConsumer() { - return consumer; + int getResponseQueueSize() { + return responseQueue.size(); } /** @@ -96,26 +92,32 @@ final class AMQPConsumer extends AMQPWorker { return responseQueue.poll(); } - public void acknowledge(final GetResponse response) throws IOException { + public void acknowledge(final GetResponse response) { if (autoAcknowledge) { return; } - getChannel().basicAck(response.getEnvelope().getDeliveryTag(), true); + try { + getChannel().basicAck(response.getEnvelope().getDeliveryTag(), true); + } catch (Exception e) { + throw new AMQPException("Failed to acknowledge message", e); + } } @Override public void close() throws TimeoutException, IOException { - closed = true; - - GetResponse lastMessage = null; - GetResponse response; - while ((response = responseQueue.poll()) != null) { - lastMessage = response; - } - - if (lastMessage != null) { - getChannel().basicNack(lastMessage.getEnvelope().getDeliveryTag(), true, true); + try { + super.close(); + } finally { + try { + GetResponse response; + while ((response = responseQueue.poll()) != null) { + // simply discard the messages, all unacknowledged messages will be redelivered by the broker when the consumer connects again + processorLog.info("Consumer is closed, discarding message (delivery tag: {}).", new Object[]{response.getEnvelope().getDeliveryTag()}); + } + } catch (Exception e) { + processorLog.error("Failed to drain response queue."); + } } } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPException.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPException.java new file mode 100644 index 0000000000..18f8b34b02 --- /dev/null +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.amqp.processors; + +/** + * Exception to indicate an AMQP related error when the FlowFile should not be tried to process again but it should be sent to failure. + * AMQPException and AMQPRollbackException are not interchangeable because of the difference in the expected error handling. + */ +public class AMQPException extends RuntimeException { + + public AMQPException(String message) { + super(message); + } + + public AMQPException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java index 553fc835cc..fed998cd03 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java @@ -17,11 +17,12 @@ package org.apache.nifi.amqp.processors; import java.io.IOException; +import java.net.SocketException; +import com.rabbitmq.client.AlreadyClosedException; import org.apache.nifi.logging.ComponentLog; import com.rabbitmq.client.AMQP.BasicProperties; -import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ReturnListener; @@ -31,7 +32,6 @@ import com.rabbitmq.client.ReturnListener; */ final class AMQPPublisher extends AMQPWorker { - private final ComponentLog processLog; private final String connectionString; /** @@ -39,11 +39,12 @@ final class AMQPPublisher extends AMQPWorker { * * @param connection instance of AMQP {@link Connection} */ - AMQPPublisher(Connection connection, ComponentLog processLog) { - super(connection); - this.processLog = processLog; + AMQPPublisher(Connection connection, ComponentLog processorLog) { + super(connection, processorLog); getChannel().addReturnListener(new UndeliverableMessageLogger()); this.connectionString = connection.toString(); + + processorLog.info("Successfully connected AMQPPublisher to " + this.connectionString); } /** @@ -60,21 +61,21 @@ final class AMQPPublisher extends AMQPWorker { void publish(byte[] bytes, BasicProperties properties, String routingKey, String exchange) { this.validateStringProperty("routingKey", routingKey); exchange = exchange == null ? "" : exchange.trim(); - if (exchange.length() == 0) { - processLog.info("The 'exchangeName' is not specified. Messages will be sent to default exchange"); - } - processLog.info("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange - + "' exchange with '" + routingKey + "' as a routing key."); - final Channel channel = getChannel(); - if (channel.isOpen()) { - try { - channel.basicPublish(exchange, routingKey, true, properties, bytes); - } catch (Exception e) { - throw new IllegalStateException("Failed to publish to Exchange '" + exchange + "' with Routing Key '" + routingKey + "'.", e); + if (processorLog.isDebugEnabled()) { + if (exchange.length() == 0) { + processorLog.debug("The 'exchangeName' is not specified. Messages will be sent to default exchange"); } - } else { - throw new IllegalStateException("This instance of AMQPPublisher is invalid since its publishingChannel is closed"); + processorLog.debug("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange + + "' exchange with '" + routingKey + "' as a routing key."); + } + + try { + getChannel().basicPublish(exchange, routingKey, true, properties, bytes); + } catch (AlreadyClosedException | SocketException e) { + throw new AMQPRollbackException("Failed to publish message because the AMQP connection is lost or has been closed", e); + } catch (Exception e) { + throw new AMQPException("Failed to publish message to Exchange '" + exchange + "' with Routing Key '" + routingKey + "'.", e); } } @@ -100,8 +101,7 @@ final class AMQPPublisher extends AMQPWorker { public void handleReturn(int replyCode, String replyText, String exchangeName, String routingKey, BasicProperties properties, byte[] message) throws IOException { String logMessage = "Message destined for '" + exchangeName + "' exchange with '" + routingKey + "' as routing key came back with replyCode=" + replyCode + " and replyText=" + replyText + "."; - processLog.warn(logMessage); - AMQPPublisher.this.processLog.warn(logMessage); + processorLog.warn(logMessage); } } } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPResource.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPResource.java index 2319e7a90d..96cea27f24 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPResource.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPResource.java @@ -19,17 +19,19 @@ package org.apache.nifi.amqp.processors; import java.io.Closeable; import java.io.IOException; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.ExecutorService; import com.rabbitmq.client.Connection; public class AMQPResource implements Closeable { private final Connection connection; + private final ExecutorService executor; private final T worker; - public AMQPResource(final Connection connection, final T worker) { + public AMQPResource(final Connection connection, final T worker, final ExecutorService executor) { this.connection = connection; this.worker = worker; + this.executor = executor; } public Connection getConnection() { @@ -48,23 +50,40 @@ public class AMQPResource implements Closeable { worker.close(); } catch (final IOException e) { ioe = e; - } catch (final TimeoutException e) { + } catch (final Exception e) { ioe = new IOException(e); } try { - connection.close(); + if (connection.isOpen()) { + connection.close(); + } } catch (final IOException e) { if (ioe == null) { ioe = e; } else { ioe.addSuppressed(e); } + } catch (final Exception e) { + if (ioe == null) { + ioe = new IOException(e); + } else { + ioe.addSuppressed(e); + } + } + + try { + executor.shutdown(); + } catch (final Exception e) { + if (ioe == null) { + ioe = new IOException(e); + } else { + ioe.addSuppressed(e); + } } if (ioe != null) { throw ioe; } } - } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPRollbackException.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPRollbackException.java new file mode 100644 index 0000000000..3324b401d7 --- /dev/null +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPRollbackException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.amqp.processors; + +/** + * Exception to indicate an AMQP related error when the FlowFile should be tried to process again so the NiFi session should be rolled back. + * AMQPRollbackException and AMQPException are not interchangeable because of the difference in the expected error handling. + */ +public class AMQPRollbackException extends RuntimeException { + + public AMQPRollbackException(String message) { + super(message); + } + + public AMQPRollbackException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java index d17ea0d1c9..fca0f50e3e 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java @@ -19,8 +19,7 @@ package org.apache.nifi.amqp.processors; import java.io.IOException; import java.util.concurrent.TimeoutException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.nifi.logging.ComponentLog; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -33,9 +32,10 @@ import com.rabbitmq.client.Connection; */ abstract class AMQPWorker implements AutoCloseable { - private final static Logger logger = LoggerFactory.getLogger(AMQPWorker.class); + protected final ComponentLog processorLog; + private final Channel channel; - private boolean closed = false; + protected volatile boolean closed = false; /** * Creates an instance of this worker initializing it with AMQP @@ -44,13 +44,15 @@ abstract class AMQPWorker implements AutoCloseable { * * @param connection instance of {@link Connection} */ - public AMQPWorker(final Connection connection) { + public AMQPWorker(final Connection connection, ComponentLog processorLog) { + this.processorLog = processorLog; + validateConnection(connection); try { this.channel = connection.createChannel(); } catch (IOException e) { - logger.error("Failed to create Channel for " + connection, e); + processorLog.error("Failed to create Channel for " + connection, e); throw new IllegalStateException(e); } } @@ -59,18 +61,19 @@ abstract class AMQPWorker implements AutoCloseable { return channel; } - @Override public void close() throws TimeoutException, IOException { if (closed) { return; } - if (logger.isDebugEnabled()) { - logger.debug("Closing AMQP channel for " + this.channel.getConnection().toString()); - } + if (channel.isOpen()) { + if (processorLog.isDebugEnabled()) { + processorLog.debug("Closing AMQP channel for " + this.channel.getConnection().toString()); + } - this.channel.close(); + this.channel.close(); + } closed = true; } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java index 3344dc9b3f..c947b7a286 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java @@ -24,8 +24,14 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import javax.net.ssl.SSLContext; + +import com.rabbitmq.client.impl.DefaultExceptionHandler; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -139,8 +145,12 @@ abstract class AbstractAMQPProcessor extends AbstractProce return propertyDescriptors; } - private final BlockingQueue> resourceQueue = new LinkedBlockingQueue<>(); + private BlockingQueue> resourceQueue; + @OnScheduled + public void onScheduled(ProcessContext context) { + resourceQueue = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); + } @Override protected Collection customValidate(ValidationContext context) { @@ -190,33 +200,49 @@ abstract class AbstractAMQPProcessor extends AbstractProce public final void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { AMQPResource resource = resourceQueue.poll(); if (resource == null) { - resource = createResource(context); + try { + resource = createResource(context); + } catch (Exception e) { + getLogger().error("Failed to initialize AMQP client", e); + context.yield(); + return; + } } try { processResource(resource.getConnection(), resource.getWorker(), context, session); - resourceQueue.offer(resource); - } catch (final Exception e) { - try { - resource.close(); - } catch (final Exception e2) { - e.addSuppressed(e2); - } - throw e; + if (!resourceQueue.offer(resource)) { + getLogger().info("Worker queue is full, closing AMQP client"); + closeResource(resource); + } + } catch (AMQPException | AMQPRollbackException e) { + getLogger().error("AMQP failure, dropping the client", e); + context.yield(); + closeResource(resource); + } catch (Exception e) { + getLogger().error("Processor failure", e); + context.yield(); } } @OnStopped public void close() { - AMQPResource resource; - while ((resource = resourceQueue.poll()) != null) { - try { - resource.close(); - } catch (final Exception e) { - getLogger().warn("Failed to close AMQP Connection", e); + if (resourceQueue != null) { + AMQPResource resource; + while ((resource = resourceQueue.poll()) != null) { + closeResource(resource); } + resourceQueue = null; + } + } + + private void closeResource(AMQPResource resource) { + try { + resource.close(); + } catch (Exception e) { + getLogger().error("Failed to close AMQP Connection", e); } } @@ -235,13 +261,28 @@ abstract class AbstractAMQPProcessor extends AbstractProce private AMQPResource createResource(final ProcessContext context) { - final Connection connection = createConnection(context); - final T worker = createAMQPWorker(context, connection); - return new AMQPResource<>(connection, worker); + Connection connection = null; + try { + ExecutorService executor = Executors.newSingleThreadExecutor(new BasicThreadFactory.Builder() + .namingPattern("AMQP Consumer: " + getIdentifier()) + .build()); + connection = createConnection(context, executor); + T worker = createAMQPWorker(context, connection); + return new AMQPResource<>(connection, worker, executor); + } catch (Exception e) { + if (connection != null && connection.isOpen()) { + try { + connection.close(); + } catch (Exception closingEx) { + getLogger().error("Failed to close AMQP Connection", closingEx); + } + } + throw e; + } } - protected Connection createConnection(ProcessContext context) { + protected Connection createConnection(ProcessContext context, ExecutorService executor) { final ConnectionFactory cf = new ConnectionFactory(); cf.setHost(context.getProperty(HOST).evaluateAttributeExpressions().getValue()); cf.setPort(Integer.parseInt(context.getProperty(PORT).evaluateAttributeExpressions().getValue())); @@ -268,8 +309,16 @@ abstract class AbstractAMQPProcessor extends AbstractProce } } + cf.setAutomaticRecoveryEnabled(false); + cf.setExceptionHandler(new DefaultExceptionHandler() { + @Override + public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) { + getLogger().error("Connection lost to server {}:{}.", new Object[]{conn.getAddress(), conn.getPort()}, exception); + } + }); + try { - Connection connection = cf.newConnection(); + Connection connection = cf.newConnection(executor); return connection; } catch (Exception e) { throw new IllegalStateException("Failed to establish connection with AMQP Broker: " + cf.toString(), e); diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java index 3af4ee91f7..6c83a20b8a 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java @@ -75,9 +75,12 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { .build(); static final PropertyDescriptor AUTO_ACKNOWLEDGE = new PropertyDescriptor.Builder() .name("auto.acknowledge") - .displayName("Auto-Acknowledge messages") - .description("If true, messages that are received will be auto-acknowledged by the AMQP Broker. " - + "This generally will provide better throughput but could result in messages being lost upon restart of NiFi") + .displayName("Auto-Acknowledge Messages") + .description(" If false (Non-Auto-Acknowledge), the messages will be acknowledged by the processor after transferring the FlowFiles to success and committing " + + "the NiFi session. Non-Auto-Acknowledge mode provides 'at-least-once' delivery semantics. " + + "If true (Auto-Acknowledge), messages that are delivered to the AMQP Client will be auto-acknowledged by the AMQP Broker just after sending them out. " + + "This generally will provide better throughput but will also result in messages being lost upon restart/crash of the AMQP Broker, NiFi or the processor. " + + "Auto-Acknowledge mode provides 'at-most-once' delivery semantics and it is recommended only if loosing messages is acceptable.") .allowableValues("true", "false") .defaultValue("false") .required(true) @@ -85,8 +88,8 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() .name("batch.size") .displayName("Batch Size") - .description("The maximum number of messages that should be pulled in a single session. Once this many messages have been received (or once no more messages are readily available), " - + "the messages received will be transferred to the 'success' relationship and the messages will be acknowledged with the AMQP Broker. Setting this value to a larger number " + .description("The maximum number of messages that should be processed in a single session. Once this many messages have been received (or once no more messages are readily available), " + + "the messages received will be transferred to the 'success' relationship and the messages will be acknowledged to the AMQP Broker. Setting this value to a larger number " + "could result in better performance, particularly for very small messages, but can also result in more messages being duplicated upon sudden restart of NiFi.") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.NONE) @@ -124,6 +127,10 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { protected void processResource(final Connection connection, final AMQPConsumer consumer, final ProcessContext context, final ProcessSession session) { GetResponse lastReceived = null; + if (!connection.isOpen() || !consumer.getChannel().isOpen()) { + throw new AMQPException("AMQP client has lost connection."); + } + for (int i = 0; i < context.getProperty(BATCH_SIZE).asInteger(); i++) { final GetResponse response = consumer.consume(); if (response == null) { @@ -147,14 +154,9 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { lastReceived = response; } - session.commit(); - if (lastReceived != null) { - try { - consumer.acknowledge(lastReceived); - } catch (IOException e) { - throw new ProcessException("Failed to acknowledge message", e); - } + session.commit(); + consumer.acknowledge(lastReceived); } } @@ -190,17 +192,10 @@ public class ConsumeAMQP extends AbstractAMQPProcessor { try { final String queueName = context.getProperty(QUEUE).getValue(); final boolean autoAcknowledge = context.getProperty(AUTO_ACKNOWLEDGE).asBoolean(); - final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, queueName, autoAcknowledge); + final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, queueName, autoAcknowledge, getLogger()); return amqpConsumer; } catch (final IOException ioe) { - try { - connection.close(); - getLogger().warn("Closed connection at port " + connection.getPort()); - } catch (final IOException ioeClose) { - throw new ProcessException("Failed to close connection at port " + connection.getPort()); - } - throw new ProcessException("Failed to connect to AMQP Broker", ioe); } } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java index 4dbb7ed3a5..1520d1baaf 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java @@ -155,12 +155,16 @@ public class PublishAMQP extends AbstractAMQPProcessor { try { publisher.publish(messageContent, amqpProperties, routingKey, exchange); - session.transfer(flowFile, REL_SUCCESS); - session.getProvenanceReporter().send(flowFile, connection.toString() + "/E:" + exchange + "/RK:" + routingKey); - } catch (Exception e) { + } catch (AMQPRollbackException e) { + session.rollback(); + throw e; + } catch (AMQPException e) { session.transfer(session.penalize(flowFile), REL_FAILURE); - getLogger().error("Failed while sending message to AMQP via " + publisher, e); + throw e; } + + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().send(flowFile, connection.toString() + "/E:" + exchange + "/RK:" + routingKey); } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java index 87278fd20c..8811053804 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java @@ -16,9 +16,10 @@ */ package org.apache.nifi.amqp.processors; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; import java.io.IOException; import java.util.Arrays; @@ -28,6 +29,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; +import org.apache.nifi.logging.ComponentLog; +import org.junit.Before; import org.junit.Test; import com.rabbitmq.client.AMQP.BasicProperties; @@ -36,41 +39,48 @@ import com.rabbitmq.client.GetResponse; public class AMQPConsumerTest { + private ComponentLog processorLog; + + @Before + public void setUp() { + processorLog = mock(ComponentLog.class); + } @Test - public void testUnconsumedMessagesNacked() throws TimeoutException, IOException { + public void testResponseQueueDrained() throws TimeoutException, IOException { final Map> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); final Map exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); final TestConnection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); - final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true); + final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, processorLog); consumer.getChannel().basicPublish("myExchange", "key1", new BasicProperties(), new byte[0]); consumer.close(); - assertTrue(((TestChannel) consumer.getChannel()).isNack(0)); + + assertEquals(0, consumer.getResponseQueueSize()); } @Test(expected = IllegalArgumentException.class) public void failOnNullConnection() throws IOException { - new AMQPConsumer(null, null, true); + new AMQPConsumer(null, null, true, processorLog); } @Test(expected = IllegalArgumentException.class) public void failOnNullQueueName() throws Exception { Connection conn = new TestConnection(null, null); - new AMQPConsumer(conn, null, true); + new AMQPConsumer(conn, null, true, processorLog); } @Test(expected = IllegalArgumentException.class) public void failOnEmptyQueueName() throws Exception { Connection conn = new TestConnection(null, null); - new AMQPConsumer(conn, " ", true); + new AMQPConsumer(conn, " ", true, processorLog); } @Test(expected = IOException.class) public void failOnNonExistingQueue() throws Exception { Connection conn = new TestConnection(null, null); - try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true)) { + try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true, processorLog)) { consumer.consume(); } } @@ -83,7 +93,7 @@ public class AMQPConsumerTest { exchangeToRoutingKeymap.put("", "queue1"); Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true)) { + try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, processorLog)) { GetResponse response = consumer.consume(); assertNull(response); } @@ -98,7 +108,7 @@ public class AMQPConsumerTest { Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap); conn.createChannel().basicPublish("myExchange", "key1", null, "hello Joe".getBytes()); - try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true)) { + try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, processorLog)) { GetResponse response = consumer.consume(); assertNotNull(response); } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java index 51bd59fa84..db3acd0708 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java @@ -43,7 +43,7 @@ public class AMQPPublisherTest { new AMQPPublisher(null, null); } - @Test(expected = IllegalStateException.class) + @Test(expected = AMQPRollbackException.class) public void failPublishIfChannelClosed() throws Exception { Connection conn = new TestConnection(null, null); try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) { @@ -52,7 +52,7 @@ public class AMQPPublisherTest { } } - @Test(expected = IllegalStateException.class) + @Test(expected = AMQPException.class) public void failPublishIfChannelFails() throws Exception { TestConnection conn = new TestConnection(null, null); try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) { diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java index 1a7fc0ae59..b82e1a9753 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import org.apache.nifi.logging.ComponentLog; @@ -36,9 +37,7 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; -import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Connection; -import com.rabbitmq.client.Envelope; import com.rabbitmq.client.MessageProperties; public class ConsumeAMQPTest { @@ -106,7 +105,7 @@ public class ConsumeAMQPTest { } @Test - public void testMessagesRejectedOnStop() throws TimeoutException, IOException { + public void testConsumerStopped() throws TimeoutException, IOException { final Map> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1")); final Map exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); @@ -133,13 +132,11 @@ public class ConsumeAMQPTest { // A single cumulative ack should be used assertTrue(((TestChannel) connection.createChannel()).isAck(0)); - // Messages 1 and 2 will have been delivered but on stop should be rejected. They will be rejected - // cumulatively, though, so only delivery Tag 2 will be nack'ed explicitly - assertTrue(((TestChannel) connection.createChannel()).isNack(2)); + assertFalse(((TestChannel) connection.createChannel()).isAck(1)); + assertFalse(((TestChannel) connection.createChannel()).isAck(2)); - // Any newly delivered messages should also be immediately nack'ed. - proc.getAMQPWorker().getConsumer().handleDelivery("123", new Envelope(3, false, "myExchange", "key1"), new BasicProperties(), new byte[0]); - assertTrue(((TestChannel) connection.createChannel()).isNack(3)); + assertFalse(connection.createChannel().isOpen()); + assertFalse(connection.isOpen()); } } @@ -186,7 +183,7 @@ public class ConsumeAMQPTest { throw new IllegalStateException("Consumer already created"); } - consumer = new AMQPConsumer(connection, context.getProperty(QUEUE).getValue(), context.getProperty(AUTO_ACKNOWLEDGE).asBoolean()); + consumer = new AMQPConsumer(connection, context.getProperty(QUEUE).getValue(), context.getProperty(AUTO_ACKNOWLEDGE).asBoolean(), getLogger()); return consumer; } catch (IOException e) { throw new ProcessException(e); @@ -198,7 +195,7 @@ public class ConsumeAMQPTest { } @Override - protected Connection createConnection(ProcessContext context) { + protected Connection createConnection(ProcessContext context, ExecutorService executor) { return connection; } } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java index 0464e8eddf..556d7b99f2 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java @@ -27,6 +27,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockFlowFile; @@ -135,7 +136,7 @@ public class PublishAMQPTest { } @Override - protected Connection createConnection(ProcessContext context) { + protected Connection createConnection(ProcessContext context, ExecutorService executor) { return connection; } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java index 1011f62bf3..7eaceae374 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java @@ -40,6 +40,7 @@ import com.rabbitmq.client.AMQP.Queue.PurgeOk; import com.rabbitmq.client.AMQP.Tx.CommitOk; import com.rabbitmq.client.AMQP.Tx.RollbackOk; import com.rabbitmq.client.AMQP.Tx.SelectOk; +import com.rabbitmq.client.AlreadyClosedException; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; @@ -237,6 +238,9 @@ class TestChannel implements Channel { if (this.corrupted) { throw new IOException("Channel is corrupted"); } + if (!this.open) { + throw new AlreadyClosedException(new ShutdownSignalException(false, false, null, null)); + } if (exchange.equals("")){ // default exchange; routingKey corresponds to a queue. BlockingQueue messages = this.getMessageQueue(routingKey);