NIFI-12306 ConsumeAzureEventHub logs partition ownership changes at info level

This closes #7970

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Peter Turcsanyi 2023-11-01 17:09:57 +01:00 committed by exceptionfactory
parent 5bcad9eef3
commit 3e8db53f88
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
1 changed files with 18 additions and 1 deletions

View File

@ -17,6 +17,8 @@
package org.apache.nifi.processors.azure.eventhub;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
@ -505,12 +507,27 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
private final Consumer<ErrorContext> errorProcessor = errorContext -> {
final PartitionContext partitionContext = errorContext.getPartitionContext();
final Throwable throwable = errorContext.getThrowable();
if (throwable instanceof AmqpException) {
final AmqpException amqpException = (AmqpException) throwable;
if (amqpException.getErrorCondition() == AmqpErrorCondition.LINK_STOLEN) {
getLogger().info("Partition was stolen by another consumer instance from the consumer group. Namespace [{}] Event Hub [{}] Consumer Group [{}] Partition [{}]. {}",
partitionContext.getFullyQualifiedNamespace(),
partitionContext.getEventHubName(),
partitionContext.getConsumerGroup(),
partitionContext.getPartitionId(),
amqpException.getMessage());
return;
}
}
getLogger().error("Receive Events failed Namespace [{}] Event Hub [{}] Consumer Group [{}] Partition [{}]",
partitionContext.getFullyQualifiedNamespace(),
partitionContext.getEventHubName(),
partitionContext.getConsumerGroup(),
partitionContext.getPartitionId(),
errorContext.getThrowable()
throwable
);
};