diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java index 33b80f0daa..fe182e55db 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java @@ -20,6 +20,8 @@ import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.exception.AmqpErrorCondition; import com.azure.core.amqp.exception.AmqpException; import com.azure.core.credential.AzureNamedKeyCredential; +import com.azure.core.http.ProxyOptions; +import com.azure.core.util.HttpClientOptions; import com.azure.identity.ManagedIdentityCredential; import com.azure.identity.ManagedIdentityCredentialBuilder; import com.azure.messaging.eventhubs.EventData; @@ -58,6 +60,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.eventhub.position.EarliestEventPositionProvider; import org.apache.nifi.processors.azure.eventhub.position.LegacyBlobStorageEventPositionProvider; import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordSetWriter; @@ -413,10 +416,14 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem final String containerName = defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), eventHubName); final String storageConnectionString = createStorageConnectionString(context); - final BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder() + final BlobContainerClientBuilder blobContainerClientBuilder = new BlobContainerClientBuilder() .connectionString(storageConnectionString) - .containerName(containerName) - .buildAsyncClient(); + .containerName(containerName); + final ProxyOptions storageProxyOptions = AzureStorageUtils.getProxyOptions(context); + if (storageProxyOptions != null) { + blobContainerClientBuilder.clientOptions(new HttpClientOptions().setProxyOptions(storageProxyOptions)); + } + final BlobContainerAsyncClient blobContainerAsyncClient = blobContainerClientBuilder.buildAsyncClient(); final BlobCheckpointStore checkpointStore = new BlobCheckpointStore(blobContainerAsyncClient); final Long receiveTimeout = context.getProperty(RECEIVE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);