NIFI-11615: Logging fixes in AMQP bundle

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #7314.
This commit is contained in:
Peter Turcsanyi 2023-05-30 22:46:00 +02:00 committed by Pierre Villard
parent 818747d84b
commit 02374798d2
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 4 additions and 4 deletions

View File

@ -58,7 +58,7 @@ final class AMQPConsumer extends AMQPWorker {
public void handleDelivery(final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) throws IOException { public void handleDelivery(final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) throws IOException {
if (closed) { if (closed) {
// simply discard the messages, all unacknowledged messages will be redelivered by the broker when the consumer connects again // simply discard the messages, all unacknowledged messages will be redelivered by the broker when the consumer connects again
processorLog.info("Consumer is closed, discarding message (delivery tag: {}).", new Object[]{envelope.getDeliveryTag()}); processorLog.info("Consumer is closed, discarding message (delivery tag: {}).", envelope.getDeliveryTag());
return; return;
} }
@ -123,7 +123,7 @@ final class AMQPConsumer extends AMQPWorker {
GetResponse response; GetResponse response;
while ((response = responseQueue.poll()) != null) { while ((response = responseQueue.poll()) != null) {
// simply discard the messages, all unacknowledged messages will be redelivered by the broker when the consumer connects again // simply discard the messages, all unacknowledged messages will be redelivered by the broker when the consumer connects again
processorLog.info("Consumer is closed, discarding message (delivery tag: {}).", new Object[]{response.getEnvelope().getDeliveryTag()}); processorLog.info("Consumer is closed, discarding message (delivery tag: {}).", response.getEnvelope().getDeliveryTag());
} }
} catch (Exception e) { } catch (Exception e) {
processorLog.error("Failed to drain response queue."); processorLog.error("Failed to drain response queue.");

View File

@ -325,7 +325,7 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
cf.setExceptionHandler(new DefaultExceptionHandler() { cf.setExceptionHandler(new DefaultExceptionHandler() {
@Override @Override
public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) { public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) {
getLogger().error("Connection lost to server {}:{}.", new Object[]{conn.getAddress(), conn.getPort()}, exception); getLogger().error("Connection lost to server {}:{}.", conn.getAddress(), conn.getPort(), exception);
} }
}); });
@ -342,7 +342,7 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
return connection; return connection;
} catch (Exception e) { } catch (Exception e) {
throw new IllegalStateException("Failed to establish connection with AMQP Broker: " + cf.toString(), e); throw new IllegalStateException(String.format("Failed to establish connection with AMQP Broker: %s:%s", cf.getHost(), cf.getPort()), e);
} }
} }
} }