mirror of https://github.com/apache/nifi.git
NIFI-6312: Improved connection handling in AMQP processors
Disable connection automatic recovery which can lead to uncontrolled/stale threads. Handle the recovery in the processors instead. Use poisoning in case of errors, then discarding and recreating the poisoned consumer/publisher. NIFI-6312: Use conventional exception handling instead of poisoning Use component logger in workers. Remove basicNack()/basicReject() calls as they are not needed because all unacknowledged messages will be redelivered. NIFI-6312: Further improve exception handling and error logging. NIFI-6312: Fix consumer closing in previous commit NIFI-6312: Use custom executor with a single thread (no more is used by the processor) Reviewed by tamas palfy and simon bence Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
parent
ddb304a927
commit
7b4cce9e21
|
@ -21,9 +21,8 @@ import java.util.concurrent.BlockingQueue;
|
|||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.rabbitmq.client.AMQP.BasicProperties;
|
||||
import com.rabbitmq.client.Channel;
|
||||
|
@ -39,30 +38,27 @@ import com.rabbitmq.client.GetResponse;
|
|||
*/
|
||||
final class AMQPConsumer extends AMQPWorker {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(AMQPConsumer.class);
|
||||
private final String queueName;
|
||||
private final BlockingQueue<GetResponse> responseQueue;
|
||||
private final boolean autoAcknowledge;
|
||||
private final Consumer consumer;
|
||||
|
||||
private volatile boolean closed = false;
|
||||
|
||||
|
||||
AMQPConsumer(final Connection connection, final String queueName, final boolean autoAcknowledge) throws IOException {
|
||||
super(connection);
|
||||
AMQPConsumer(final Connection connection, final String queueName, final boolean autoAcknowledge, ComponentLog processorLog) throws IOException {
|
||||
super(connection, processorLog);
|
||||
this.validateStringProperty("queueName", queueName);
|
||||
this.queueName = queueName;
|
||||
this.autoAcknowledge = autoAcknowledge;
|
||||
this.responseQueue = new LinkedBlockingQueue<>(10);
|
||||
|
||||
logger.info("Successfully connected AMQPConsumer to " + connection.toString() + " and '" + queueName + "' queue");
|
||||
processorLog.info("Successfully connected AMQPConsumer to " + connection.toString() + " and '" + queueName + "' queue");
|
||||
|
||||
final Channel channel = getChannel();
|
||||
consumer = new DefaultConsumer(channel) {
|
||||
@Override
|
||||
public void handleDelivery(final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) throws IOException {
|
||||
if (!autoAcknowledge && closed) {
|
||||
channel.basicReject(envelope.getDeliveryTag(), true);
|
||||
if (closed) {
|
||||
// simply discard the messages, all unacknowledged messages will be redelivered by the broker when the consumer connects again
|
||||
processorLog.info("Consumer is closed, discarding message (delivery tag: {}).", new Object[]{envelope.getDeliveryTag()});
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -78,8 +74,8 @@ final class AMQPConsumer extends AMQPWorker {
|
|||
}
|
||||
|
||||
// Visible for unit tests
|
||||
protected Consumer getConsumer() {
|
||||
return consumer;
|
||||
int getResponseQueueSize() {
|
||||
return responseQueue.size();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -96,26 +92,32 @@ final class AMQPConsumer extends AMQPWorker {
|
|||
return responseQueue.poll();
|
||||
}
|
||||
|
||||
public void acknowledge(final GetResponse response) throws IOException {
|
||||
public void acknowledge(final GetResponse response) {
|
||||
if (autoAcknowledge) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
getChannel().basicAck(response.getEnvelope().getDeliveryTag(), true);
|
||||
} catch (Exception e) {
|
||||
throw new AMQPException("Failed to acknowledge message", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws TimeoutException, IOException {
|
||||
closed = true;
|
||||
|
||||
GetResponse lastMessage = null;
|
||||
try {
|
||||
super.close();
|
||||
} finally {
|
||||
try {
|
||||
GetResponse response;
|
||||
while ((response = responseQueue.poll()) != null) {
|
||||
lastMessage = response;
|
||||
// simply discard the messages, all unacknowledged messages will be redelivered by the broker when the consumer connects again
|
||||
processorLog.info("Consumer is closed, discarding message (delivery tag: {}).", new Object[]{response.getEnvelope().getDeliveryTag()});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
processorLog.error("Failed to drain response queue.");
|
||||
}
|
||||
|
||||
if (lastMessage != null) {
|
||||
getChannel().basicNack(lastMessage.getEnvelope().getDeliveryTag(), true, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.amqp.processors;
|
||||
|
||||
/**
|
||||
* Exception to indicate an AMQP related error when the FlowFile should not be tried to process again but it should be sent to failure.
|
||||
* AMQPException and AMQPRollbackException are not interchangeable because of the difference in the expected error handling.
|
||||
*/
|
||||
public class AMQPException extends RuntimeException {
|
||||
|
||||
public AMQPException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public AMQPException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
|
@ -17,11 +17,12 @@
|
|||
package org.apache.nifi.amqp.processors;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketException;
|
||||
|
||||
import com.rabbitmq.client.AlreadyClosedException;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
|
||||
import com.rabbitmq.client.AMQP.BasicProperties;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ReturnListener;
|
||||
|
||||
|
@ -31,7 +32,6 @@ import com.rabbitmq.client.ReturnListener;
|
|||
*/
|
||||
final class AMQPPublisher extends AMQPWorker {
|
||||
|
||||
private final ComponentLog processLog;
|
||||
private final String connectionString;
|
||||
|
||||
/**
|
||||
|
@ -39,11 +39,12 @@ final class AMQPPublisher extends AMQPWorker {
|
|||
*
|
||||
* @param connection instance of AMQP {@link Connection}
|
||||
*/
|
||||
AMQPPublisher(Connection connection, ComponentLog processLog) {
|
||||
super(connection);
|
||||
this.processLog = processLog;
|
||||
AMQPPublisher(Connection connection, ComponentLog processorLog) {
|
||||
super(connection, processorLog);
|
||||
getChannel().addReturnListener(new UndeliverableMessageLogger());
|
||||
this.connectionString = connection.toString();
|
||||
|
||||
processorLog.info("Successfully connected AMQPPublisher to " + this.connectionString);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -60,21 +61,21 @@ final class AMQPPublisher extends AMQPWorker {
|
|||
void publish(byte[] bytes, BasicProperties properties, String routingKey, String exchange) {
|
||||
this.validateStringProperty("routingKey", routingKey);
|
||||
exchange = exchange == null ? "" : exchange.trim();
|
||||
if (exchange.length() == 0) {
|
||||
processLog.info("The 'exchangeName' is not specified. Messages will be sent to default exchange");
|
||||
}
|
||||
processLog.info("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange
|
||||
+ "' exchange with '" + routingKey + "' as a routing key.");
|
||||
|
||||
final Channel channel = getChannel();
|
||||
if (channel.isOpen()) {
|
||||
try {
|
||||
channel.basicPublish(exchange, routingKey, true, properties, bytes);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to publish to Exchange '" + exchange + "' with Routing Key '" + routingKey + "'.", e);
|
||||
if (processorLog.isDebugEnabled()) {
|
||||
if (exchange.length() == 0) {
|
||||
processorLog.debug("The 'exchangeName' is not specified. Messages will be sent to default exchange");
|
||||
}
|
||||
} else {
|
||||
throw new IllegalStateException("This instance of AMQPPublisher is invalid since its publishingChannel is closed");
|
||||
processorLog.debug("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange
|
||||
+ "' exchange with '" + routingKey + "' as a routing key.");
|
||||
}
|
||||
|
||||
try {
|
||||
getChannel().basicPublish(exchange, routingKey, true, properties, bytes);
|
||||
} catch (AlreadyClosedException | SocketException e) {
|
||||
throw new AMQPRollbackException("Failed to publish message because the AMQP connection is lost or has been closed", e);
|
||||
} catch (Exception e) {
|
||||
throw new AMQPException("Failed to publish message to Exchange '" + exchange + "' with Routing Key '" + routingKey + "'.", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -100,8 +101,7 @@ final class AMQPPublisher extends AMQPWorker {
|
|||
public void handleReturn(int replyCode, String replyText, String exchangeName, String routingKey, BasicProperties properties, byte[] message) throws IOException {
|
||||
String logMessage = "Message destined for '" + exchangeName + "' exchange with '" + routingKey
|
||||
+ "' as routing key came back with replyCode=" + replyCode + " and replyText=" + replyText + ".";
|
||||
processLog.warn(logMessage);
|
||||
AMQPPublisher.this.processLog.warn(logMessage);
|
||||
processorLog.warn(logMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,17 +19,19 @@ package org.apache.nifi.amqp.processors;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import com.rabbitmq.client.Connection;
|
||||
|
||||
public class AMQPResource<T extends AMQPWorker> implements Closeable {
|
||||
private final Connection connection;
|
||||
private final ExecutorService executor;
|
||||
private final T worker;
|
||||
|
||||
public AMQPResource(final Connection connection, final T worker) {
|
||||
public AMQPResource(final Connection connection, final T worker, final ExecutorService executor) {
|
||||
this.connection = connection;
|
||||
this.worker = worker;
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
public Connection getConnection() {
|
||||
|
@ -48,23 +50,40 @@ public class AMQPResource<T extends AMQPWorker> implements Closeable {
|
|||
worker.close();
|
||||
} catch (final IOException e) {
|
||||
ioe = e;
|
||||
} catch (final TimeoutException e) {
|
||||
} catch (final Exception e) {
|
||||
ioe = new IOException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
if (connection.isOpen()) {
|
||||
connection.close();
|
||||
}
|
||||
} catch (final IOException e) {
|
||||
if (ioe == null) {
|
||||
ioe = e;
|
||||
} else {
|
||||
ioe.addSuppressed(e);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
if (ioe == null) {
|
||||
ioe = new IOException(e);
|
||||
} else {
|
||||
ioe.addSuppressed(e);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
executor.shutdown();
|
||||
} catch (final Exception e) {
|
||||
if (ioe == null) {
|
||||
ioe = new IOException(e);
|
||||
} else {
|
||||
ioe.addSuppressed(e);
|
||||
}
|
||||
}
|
||||
|
||||
if (ioe != null) {
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.amqp.processors;
|
||||
|
||||
/**
|
||||
* Exception to indicate an AMQP related error when the FlowFile should be tried to process again so the NiFi session should be rolled back.
|
||||
* AMQPRollbackException and AMQPException are not interchangeable because of the difference in the expected error handling.
|
||||
*/
|
||||
public class AMQPRollbackException extends RuntimeException {
|
||||
|
||||
public AMQPRollbackException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public AMQPRollbackException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
|
@ -19,8 +19,7 @@ package org.apache.nifi.amqp.processors;
|
|||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
|
@ -33,9 +32,10 @@ import com.rabbitmq.client.Connection;
|
|||
*/
|
||||
abstract class AMQPWorker implements AutoCloseable {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(AMQPWorker.class);
|
||||
protected final ComponentLog processorLog;
|
||||
|
||||
private final Channel channel;
|
||||
private boolean closed = false;
|
||||
protected volatile boolean closed = false;
|
||||
|
||||
/**
|
||||
* Creates an instance of this worker initializing it with AMQP
|
||||
|
@ -44,13 +44,15 @@ abstract class AMQPWorker implements AutoCloseable {
|
|||
*
|
||||
* @param connection instance of {@link Connection}
|
||||
*/
|
||||
public AMQPWorker(final Connection connection) {
|
||||
public AMQPWorker(final Connection connection, ComponentLog processorLog) {
|
||||
this.processorLog = processorLog;
|
||||
|
||||
validateConnection(connection);
|
||||
|
||||
try {
|
||||
this.channel = connection.createChannel();
|
||||
} catch (IOException e) {
|
||||
logger.error("Failed to create Channel for " + connection, e);
|
||||
processorLog.error("Failed to create Channel for " + connection, e);
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
@ -59,18 +61,19 @@ abstract class AMQPWorker implements AutoCloseable {
|
|||
return channel;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() throws TimeoutException, IOException {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Closing AMQP channel for " + this.channel.getConnection().toString());
|
||||
if (channel.isOpen()) {
|
||||
if (processorLog.isDebugEnabled()) {
|
||||
processorLog.debug("Closing AMQP channel for " + this.channel.getConnection().toString());
|
||||
}
|
||||
|
||||
this.channel.close();
|
||||
}
|
||||
closed = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -24,8 +24,14 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import com.rabbitmq.client.impl.DefaultExceptionHandler;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
|
@ -139,8 +145,12 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
|
|||
return propertyDescriptors;
|
||||
}
|
||||
|
||||
private final BlockingQueue<AMQPResource<T>> resourceQueue = new LinkedBlockingQueue<>();
|
||||
private BlockingQueue<AMQPResource<T>> resourceQueue;
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(ProcessContext context) {
|
||||
resourceQueue = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext context) {
|
||||
|
@ -190,33 +200,49 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
|
|||
public final void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
AMQPResource<T> resource = resourceQueue.poll();
|
||||
if (resource == null) {
|
||||
try {
|
||||
resource = createResource(context);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Failed to initialize AMQP client", e);
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
processResource(resource.getConnection(), resource.getWorker(), context, session);
|
||||
resourceQueue.offer(resource);
|
||||
} catch (final Exception e) {
|
||||
try {
|
||||
resource.close();
|
||||
} catch (final Exception e2) {
|
||||
e.addSuppressed(e2);
|
||||
}
|
||||
|
||||
throw e;
|
||||
if (!resourceQueue.offer(resource)) {
|
||||
getLogger().info("Worker queue is full, closing AMQP client");
|
||||
closeResource(resource);
|
||||
}
|
||||
} catch (AMQPException | AMQPRollbackException e) {
|
||||
getLogger().error("AMQP failure, dropping the client", e);
|
||||
context.yield();
|
||||
closeResource(resource);
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Processor failure", e);
|
||||
context.yield();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@OnStopped
|
||||
public void close() {
|
||||
if (resourceQueue != null) {
|
||||
AMQPResource<T> resource;
|
||||
while ((resource = resourceQueue.poll()) != null) {
|
||||
closeResource(resource);
|
||||
}
|
||||
resourceQueue = null;
|
||||
}
|
||||
}
|
||||
|
||||
private void closeResource(AMQPResource<T> resource) {
|
||||
try {
|
||||
resource.close();
|
||||
} catch (final Exception e) {
|
||||
getLogger().warn("Failed to close AMQP Connection", e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Failed to close AMQP Connection", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -235,13 +261,28 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
|
|||
|
||||
|
||||
private AMQPResource<T> createResource(final ProcessContext context) {
|
||||
final Connection connection = createConnection(context);
|
||||
final T worker = createAMQPWorker(context, connection);
|
||||
return new AMQPResource<>(connection, worker);
|
||||
Connection connection = null;
|
||||
try {
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor(new BasicThreadFactory.Builder()
|
||||
.namingPattern("AMQP Consumer: " + getIdentifier())
|
||||
.build());
|
||||
connection = createConnection(context, executor);
|
||||
T worker = createAMQPWorker(context, connection);
|
||||
return new AMQPResource<>(connection, worker, executor);
|
||||
} catch (Exception e) {
|
||||
if (connection != null && connection.isOpen()) {
|
||||
try {
|
||||
connection.close();
|
||||
} catch (Exception closingEx) {
|
||||
getLogger().error("Failed to close AMQP Connection", closingEx);
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected Connection createConnection(ProcessContext context) {
|
||||
protected Connection createConnection(ProcessContext context, ExecutorService executor) {
|
||||
final ConnectionFactory cf = new ConnectionFactory();
|
||||
cf.setHost(context.getProperty(HOST).evaluateAttributeExpressions().getValue());
|
||||
cf.setPort(Integer.parseInt(context.getProperty(PORT).evaluateAttributeExpressions().getValue()));
|
||||
|
@ -268,8 +309,16 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
|
|||
}
|
||||
}
|
||||
|
||||
cf.setAutomaticRecoveryEnabled(false);
|
||||
cf.setExceptionHandler(new DefaultExceptionHandler() {
|
||||
@Override
|
||||
public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) {
|
||||
getLogger().error("Connection lost to server {}:{}.", new Object[]{conn.getAddress(), conn.getPort()}, exception);
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
Connection connection = cf.newConnection();
|
||||
Connection connection = cf.newConnection(executor);
|
||||
return connection;
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to establish connection with AMQP Broker: " + cf.toString(), e);
|
||||
|
|
|
@ -75,9 +75,12 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
|
|||
.build();
|
||||
static final PropertyDescriptor AUTO_ACKNOWLEDGE = new PropertyDescriptor.Builder()
|
||||
.name("auto.acknowledge")
|
||||
.displayName("Auto-Acknowledge messages")
|
||||
.description("If true, messages that are received will be auto-acknowledged by the AMQP Broker. "
|
||||
+ "This generally will provide better throughput but could result in messages being lost upon restart of NiFi")
|
||||
.displayName("Auto-Acknowledge Messages")
|
||||
.description(" If false (Non-Auto-Acknowledge), the messages will be acknowledged by the processor after transferring the FlowFiles to success and committing "
|
||||
+ "the NiFi session. Non-Auto-Acknowledge mode provides 'at-least-once' delivery semantics. "
|
||||
+ "If true (Auto-Acknowledge), messages that are delivered to the AMQP Client will be auto-acknowledged by the AMQP Broker just after sending them out. "
|
||||
+ "This generally will provide better throughput but will also result in messages being lost upon restart/crash of the AMQP Broker, NiFi or the processor. "
|
||||
+ "Auto-Acknowledge mode provides 'at-most-once' delivery semantics and it is recommended only if loosing messages is acceptable.")
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("false")
|
||||
.required(true)
|
||||
|
@ -85,8 +88,8 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
|
|||
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("batch.size")
|
||||
.displayName("Batch Size")
|
||||
.description("The maximum number of messages that should be pulled in a single session. Once this many messages have been received (or once no more messages are readily available), "
|
||||
+ "the messages received will be transferred to the 'success' relationship and the messages will be acknowledged with the AMQP Broker. Setting this value to a larger number "
|
||||
.description("The maximum number of messages that should be processed in a single session. Once this many messages have been received (or once no more messages are readily available), "
|
||||
+ "the messages received will be transferred to the 'success' relationship and the messages will be acknowledged to the AMQP Broker. Setting this value to a larger number "
|
||||
+ "could result in better performance, particularly for very small messages, but can also result in more messages being duplicated upon sudden restart of NiFi.")
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
|
@ -124,6 +127,10 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
|
|||
protected void processResource(final Connection connection, final AMQPConsumer consumer, final ProcessContext context, final ProcessSession session) {
|
||||
GetResponse lastReceived = null;
|
||||
|
||||
if (!connection.isOpen() || !consumer.getChannel().isOpen()) {
|
||||
throw new AMQPException("AMQP client has lost connection.");
|
||||
}
|
||||
|
||||
for (int i = 0; i < context.getProperty(BATCH_SIZE).asInteger(); i++) {
|
||||
final GetResponse response = consumer.consume();
|
||||
if (response == null) {
|
||||
|
@ -147,14 +154,9 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
|
|||
lastReceived = response;
|
||||
}
|
||||
|
||||
session.commit();
|
||||
|
||||
if (lastReceived != null) {
|
||||
try {
|
||||
session.commit();
|
||||
consumer.acknowledge(lastReceived);
|
||||
} catch (IOException e) {
|
||||
throw new ProcessException("Failed to acknowledge message", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -190,17 +192,10 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
|
|||
try {
|
||||
final String queueName = context.getProperty(QUEUE).getValue();
|
||||
final boolean autoAcknowledge = context.getProperty(AUTO_ACKNOWLEDGE).asBoolean();
|
||||
final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, queueName, autoAcknowledge);
|
||||
final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, queueName, autoAcknowledge, getLogger());
|
||||
|
||||
return amqpConsumer;
|
||||
} catch (final IOException ioe) {
|
||||
try {
|
||||
connection.close();
|
||||
getLogger().warn("Closed connection at port " + connection.getPort());
|
||||
} catch (final IOException ioeClose) {
|
||||
throw new ProcessException("Failed to close connection at port " + connection.getPort());
|
||||
}
|
||||
|
||||
throw new ProcessException("Failed to connect to AMQP Broker", ioe);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -155,12 +155,16 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
|
|||
|
||||
try {
|
||||
publisher.publish(messageContent, amqpProperties, routingKey, exchange);
|
||||
} catch (AMQPRollbackException e) {
|
||||
session.rollback();
|
||||
throw e;
|
||||
} catch (AMQPException e) {
|
||||
session.transfer(session.penalize(flowFile), REL_FAILURE);
|
||||
throw e;
|
||||
}
|
||||
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
session.getProvenanceReporter().send(flowFile, connection.toString() + "/E:" + exchange + "/RK:" + routingKey);
|
||||
} catch (Exception e) {
|
||||
session.transfer(session.penalize(flowFile), REL_FAILURE);
|
||||
getLogger().error("Failed while sending message to AMQP via " + publisher, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -16,9 +16,10 @@
|
|||
*/
|
||||
package org.apache.nifi.amqp.processors;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
@ -28,6 +29,8 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.rabbitmq.client.AMQP.BasicProperties;
|
||||
|
@ -36,41 +39,48 @@ import com.rabbitmq.client.GetResponse;
|
|||
|
||||
public class AMQPConsumerTest {
|
||||
|
||||
private ComponentLog processorLog;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
processorLog = mock(ComponentLog.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnconsumedMessagesNacked() throws TimeoutException, IOException {
|
||||
public void testResponseQueueDrained() throws TimeoutException, IOException {
|
||||
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
|
||||
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
|
||||
|
||||
final TestConnection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
|
||||
final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true);
|
||||
final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, processorLog);
|
||||
consumer.getChannel().basicPublish("myExchange", "key1", new BasicProperties(), new byte[0]);
|
||||
|
||||
consumer.close();
|
||||
assertTrue(((TestChannel) consumer.getChannel()).isNack(0));
|
||||
|
||||
assertEquals(0, consumer.getResponseQueueSize());
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void failOnNullConnection() throws IOException {
|
||||
new AMQPConsumer(null, null, true);
|
||||
new AMQPConsumer(null, null, true, processorLog);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void failOnNullQueueName() throws Exception {
|
||||
Connection conn = new TestConnection(null, null);
|
||||
new AMQPConsumer(conn, null, true);
|
||||
new AMQPConsumer(conn, null, true, processorLog);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void failOnEmptyQueueName() throws Exception {
|
||||
Connection conn = new TestConnection(null, null);
|
||||
new AMQPConsumer(conn, " ", true);
|
||||
new AMQPConsumer(conn, " ", true, processorLog);
|
||||
}
|
||||
|
||||
@Test(expected = IOException.class)
|
||||
public void failOnNonExistingQueue() throws Exception {
|
||||
Connection conn = new TestConnection(null, null);
|
||||
try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true)) {
|
||||
try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true, processorLog)) {
|
||||
consumer.consume();
|
||||
}
|
||||
}
|
||||
|
@ -83,7 +93,7 @@ public class AMQPConsumerTest {
|
|||
exchangeToRoutingKeymap.put("", "queue1");
|
||||
|
||||
Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap);
|
||||
try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true)) {
|
||||
try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, processorLog)) {
|
||||
GetResponse response = consumer.consume();
|
||||
assertNull(response);
|
||||
}
|
||||
|
@ -98,7 +108,7 @@ public class AMQPConsumerTest {
|
|||
|
||||
Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap);
|
||||
conn.createChannel().basicPublish("myExchange", "key1", null, "hello Joe".getBytes());
|
||||
try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true)) {
|
||||
try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, processorLog)) {
|
||||
GetResponse response = consumer.consume();
|
||||
assertNotNull(response);
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ public class AMQPPublisherTest {
|
|||
new AMQPPublisher(null, null);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
@Test(expected = AMQPRollbackException.class)
|
||||
public void failPublishIfChannelClosed() throws Exception {
|
||||
Connection conn = new TestConnection(null, null);
|
||||
try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) {
|
||||
|
@ -52,7 +52,7 @@ public class AMQPPublisherTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
@Test(expected = AMQPException.class)
|
||||
public void failPublishIfChannelFails() throws Exception {
|
||||
TestConnection conn = new TestConnection(null, null);
|
||||
try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) {
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Arrays;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
|
@ -36,9 +37,7 @@ import org.apache.nifi.util.TestRunner;
|
|||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.rabbitmq.client.AMQP.BasicProperties;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.Envelope;
|
||||
import com.rabbitmq.client.MessageProperties;
|
||||
|
||||
public class ConsumeAMQPTest {
|
||||
|
@ -106,7 +105,7 @@ public class ConsumeAMQPTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testMessagesRejectedOnStop() throws TimeoutException, IOException {
|
||||
public void testConsumerStopped() throws TimeoutException, IOException {
|
||||
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1"));
|
||||
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
|
||||
|
||||
|
@ -133,13 +132,11 @@ public class ConsumeAMQPTest {
|
|||
// A single cumulative ack should be used
|
||||
assertTrue(((TestChannel) connection.createChannel()).isAck(0));
|
||||
|
||||
// Messages 1 and 2 will have been delivered but on stop should be rejected. They will be rejected
|
||||
// cumulatively, though, so only delivery Tag 2 will be nack'ed explicitly
|
||||
assertTrue(((TestChannel) connection.createChannel()).isNack(2));
|
||||
assertFalse(((TestChannel) connection.createChannel()).isAck(1));
|
||||
assertFalse(((TestChannel) connection.createChannel()).isAck(2));
|
||||
|
||||
// Any newly delivered messages should also be immediately nack'ed.
|
||||
proc.getAMQPWorker().getConsumer().handleDelivery("123", new Envelope(3, false, "myExchange", "key1"), new BasicProperties(), new byte[0]);
|
||||
assertTrue(((TestChannel) connection.createChannel()).isNack(3));
|
||||
assertFalse(connection.createChannel().isOpen());
|
||||
assertFalse(connection.isOpen());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -186,7 +183,7 @@ public class ConsumeAMQPTest {
|
|||
throw new IllegalStateException("Consumer already created");
|
||||
}
|
||||
|
||||
consumer = new AMQPConsumer(connection, context.getProperty(QUEUE).getValue(), context.getProperty(AUTO_ACKNOWLEDGE).asBoolean());
|
||||
consumer = new AMQPConsumer(connection, context.getProperty(QUEUE).getValue(), context.getProperty(AUTO_ACKNOWLEDGE).asBoolean(), getLogger());
|
||||
return consumer;
|
||||
} catch (IOException e) {
|
||||
throw new ProcessException(e);
|
||||
|
@ -198,7 +195,7 @@ public class ConsumeAMQPTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Connection createConnection(ProcessContext context) {
|
||||
protected Connection createConnection(ProcessContext context, ExecutorService executor) {
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Date;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
|
@ -135,7 +136,7 @@ public class PublishAMQPTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Connection createConnection(ProcessContext context) {
|
||||
protected Connection createConnection(ProcessContext context, ExecutorService executor) {
|
||||
return connection;
|
||||
}
|
||||
|
||||
|
|
|
@ -40,6 +40,7 @@ import com.rabbitmq.client.AMQP.Queue.PurgeOk;
|
|||
import com.rabbitmq.client.AMQP.Tx.CommitOk;
|
||||
import com.rabbitmq.client.AMQP.Tx.RollbackOk;
|
||||
import com.rabbitmq.client.AMQP.Tx.SelectOk;
|
||||
import com.rabbitmq.client.AlreadyClosedException;
|
||||
import com.rabbitmq.client.BuiltinExchangeType;
|
||||
import com.rabbitmq.client.CancelCallback;
|
||||
import com.rabbitmq.client.Channel;
|
||||
|
@ -237,6 +238,9 @@ class TestChannel implements Channel {
|
|||
if (this.corrupted) {
|
||||
throw new IOException("Channel is corrupted");
|
||||
}
|
||||
if (!this.open) {
|
||||
throw new AlreadyClosedException(new ShutdownSignalException(false, false, null, null));
|
||||
}
|
||||
|
||||
if (exchange.equals("")){ // default exchange; routingKey corresponds to a queue.
|
||||
BlockingQueue<GetResponse> messages = this.getMessageQueue(routingKey);
|
||||
|
|
Loading…
Reference in New Issue