NIFI-12412 Support Proxies for Blob Checkpoints in ConsumeAzureEventHub

This closes #8107

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Peter Turcsanyi 2023-12-03 21:35:03 +01:00 committed by exceptionfactory
parent d0dd4e03e0
commit 8645088e1d
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
1 changed files with 10 additions and 3 deletions

View File

@ -20,6 +20,8 @@ import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.core.http.ProxyOptions;
import com.azure.core.util.HttpClientOptions;
import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.messaging.eventhubs.EventData;
@ -58,6 +60,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.eventhub.position.EarliestEventPositionProvider;
import org.apache.nifi.processors.azure.eventhub.position.LegacyBlobStorageEventPositionProvider;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
@ -413,10 +416,14 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
final String containerName = defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), eventHubName);
final String storageConnectionString = createStorageConnectionString(context);
final BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
final BlobContainerClientBuilder blobContainerClientBuilder = new BlobContainerClientBuilder()
.connectionString(storageConnectionString)
.containerName(containerName)
.buildAsyncClient();
.containerName(containerName);
final ProxyOptions storageProxyOptions = AzureStorageUtils.getProxyOptions(context);
if (storageProxyOptions != null) {
blobContainerClientBuilder.clientOptions(new HttpClientOptions().setProxyOptions(storageProxyOptions));
}
final BlobContainerAsyncClient blobContainerAsyncClient = blobContainerClientBuilder.buildAsyncClient();
final BlobCheckpointStore checkpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
final Long receiveTimeout = context.getProperty(RECEIVE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);