From 3e8db53f8822d98c007d9d95b10a7d3944d8124e Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi Date: Wed, 1 Nov 2023 17:09:57 +0100 Subject: [PATCH] NIFI-12306 ConsumeAzureEventHub logs partition ownership changes at info level This closes #7970 Signed-off-by: David Handermann --- .../azure/eventhub/ConsumeAzureEventHub.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java index a3aef2affd..33b80f0daa 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java @@ -17,6 +17,8 @@ package org.apache.nifi.processors.azure.eventhub; import com.azure.core.amqp.AmqpTransportType; +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpException; import com.azure.core.credential.AzureNamedKeyCredential; import com.azure.identity.ManagedIdentityCredential; import com.azure.identity.ManagedIdentityCredentialBuilder; @@ -505,12 +507,27 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem private final Consumer errorProcessor = errorContext -> { final PartitionContext partitionContext = errorContext.getPartitionContext(); + final Throwable throwable = errorContext.getThrowable(); + + if (throwable instanceof AmqpException) { + final AmqpException amqpException = (AmqpException) throwable; + if (amqpException.getErrorCondition() == AmqpErrorCondition.LINK_STOLEN) { + getLogger().info("Partition was stolen by another consumer instance from the consumer group. Namespace [{}] Event Hub [{}] Consumer Group [{}] Partition [{}]. {}", + partitionContext.getFullyQualifiedNamespace(), + partitionContext.getEventHubName(), + partitionContext.getConsumerGroup(), + partitionContext.getPartitionId(), + amqpException.getMessage()); + return; + } + } + getLogger().error("Receive Events failed Namespace [{}] Event Hub [{}] Consumer Group [{}] Partition [{}]", partitionContext.getFullyQualifiedNamespace(), partitionContext.getEventHubName(), partitionContext.getConsumerGroup(), partitionContext.getPartitionId(), - errorContext.getThrowable() + throwable ); };