From f66540eb6de45c513d0138f073ed210c73688932 Mon Sep 17 00:00:00 2001 From: Nandor Soma Abonyi Date: Wed, 20 Apr 2022 15:05:05 +0200 Subject: [PATCH] NIFI-9951 Add proxy support to Azure ADLS and Blob v12 processors This closes #5990. Signed-off-by: Peter Turcsanyi --- .../nifi-azure-processors/pom.xml | 6 ++ .../azure/AbstractAzureBlobProcessor_v12.java | 18 +++-- ...AbstractAzureDataLakeStorageProcessor.java | 52 ++++++-------- .../storage/DeleteAzureBlobStorage_v12.java | 9 +-- .../storage/DeleteAzureDataLakeStorage.java | 4 +- .../storage/FetchAzureBlobStorage_v12.java | 13 ++-- .../storage/FetchAzureDataLakeStorage.java | 21 ++++-- .../storage/ListAzureBlobStorage_v12.java | 3 +- .../storage/ListAzureDataLakeStorage.java | 4 +- .../storage/MoveAzureDataLakeStorage.java | 4 +- .../storage/PutAzureBlobStorage_v12.java | 17 ++--- .../storage/PutAzureDataLakeStorage.java | 22 +++--- .../storage/utils/AzureStorageUtils.java | 44 ++++++++++++ .../azure/storage/AbstractAzureStorageIT.java | 71 ++++++++++++++++--- .../storage/ITDeleteAzureBlobStorage_v12.java | 11 +++ .../storage/ITDeleteAzureDataLakeStorage.java | 18 +++++ .../storage/ITFetchAzureBlobStorage_v12.java | 11 +++ .../storage/ITFetchAzureDataLakeStorage.java | 29 ++++++-- .../storage/ITListAzureBlobStorage_v12.java | 11 +++ .../storage/ITListAzureDataLakeStorage.java | 12 +++- .../storage/ITMoveAzureDataLakeStorage.java | 11 +++ .../storage/ITPutAzureBlobStorage_v12.java | 9 +++ .../storage/ITPutAzureDataLakeStorage.java | 14 +++- .../apache/nifi/proxy/ProxyConfiguration.java | 15 +++- .../org/apache/nifi/proxy/SocksVersion.java | 22 ++++++ .../StandardProxyConfigurationService.java | 26 +++++-- 26 files changed, 377 insertions(+), 100 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/SocksVersion.java diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 0a1ebcfa70..45bf13c8c0 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -188,6 +188,12 @@ nifi-distributed-cache-client-service-api test + + org.apache.nifi + nifi-proxy-configuration + 1.17.0-SNAPSHOT + test + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java index 91e44206c3..991409d5b8 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java @@ -18,6 +18,8 @@ package org.apache.nifi.processors.azure; import com.azure.core.credential.AzureSasCredential; import com.azure.core.credential.TokenCredential; +import com.azure.core.http.HttpClient; +import com.azure.core.http.netty.NettyAsyncHttpClientBuilder; import com.azure.identity.ClientSecretCredentialBuilder; import com.azure.identity.ManagedIdentityCredentialBuilder; import com.azure.storage.blob.BlobClient; @@ -45,6 +47,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBTYPE; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER; @@ -110,12 +113,19 @@ public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor { } public static BlobServiceClient createStorageClient(PropertyContext context) { - AzureStorageCredentialsService_v12 credentialsService = context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class); - AzureStorageCredentialsDetails_v12 credentialsDetails = credentialsService.getCredentialsDetails(); + final AzureStorageCredentialsService_v12 credentialsService = context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class); + final AzureStorageCredentialsDetails_v12 credentialsDetails = credentialsService.getCredentialsDetails(); - BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder(); + final BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder(); clientBuilder.endpoint(String.format("https://%s.%s", credentialsDetails.getAccountName(), credentialsDetails.getEndpointSuffix())); + final NettyAsyncHttpClientBuilder nettyClientBuilder = new NettyAsyncHttpClientBuilder(); + + nettyClientBuilder.proxy(getProxyOptions(context)); + + final HttpClient nettyClient = nettyClientBuilder.build(); + clientBuilder.httpClient(nettyClient); + configureCredential(clientBuilder, credentialsService, credentialsDetails); return clientBuilder.buildClient(); @@ -131,7 +141,7 @@ public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor { break; case MANAGED_IDENTITY: clientBuilder.credential(new ManagedIdentityCredentialBuilder() - .clientId(credentialsDetails.getManagedIdentityClientId()) + .clientId(credentialsDetails.getManagedIdentityClientId()) .build()); break; case SERVICE_PRINCIPAL: diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java index 39e2ad28bc..3047de5f6e 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java @@ -18,6 +18,8 @@ package org.apache.nifi.processors.azure; import com.azure.core.credential.AccessToken; import com.azure.core.credential.TokenCredential; +import com.azure.core.http.HttpClient; +import com.azure.core.http.netty.NettyAsyncHttpClientBuilder; import com.azure.identity.ClientSecretCredential; import com.azure.identity.ClientSecretCredentialBuilder; import com.azure.identity.ManagedIdentityCredential; @@ -45,11 +47,11 @@ import reactor.core.publisher.Mono; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions; public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor { @@ -94,23 +96,11 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc "Files that could not be written to Azure storage for some reason are transferred to this relationship") .build(); - private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( - ADLS_CREDENTIALS_SERVICE, - FILESYSTEM, - DIRECTORY, - FILE - )); - private static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( REL_SUCCESS, REL_FAILURE ))); - @Override - protected List getSupportedPropertyDescriptors() { - return PROPERTIES; - } - @Override public Set getRelationships() { return RELATIONSHIPS; @@ -136,43 +126,41 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc final String endpoint = String.format("https://%s.%s", accountName, endpointSuffix); - final DataLakeServiceClient storageClient; + final DataLakeServiceClientBuilder dataLakeServiceClientBuilder = new DataLakeServiceClientBuilder(); + dataLakeServiceClientBuilder.endpoint(endpoint); + if (StringUtils.isNotBlank(accountKey)) { - final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, - accountKey); - storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential) - .buildClient(); + final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey); + dataLakeServiceClientBuilder.credential(credential); } else if (StringUtils.isNotBlank(sasToken)) { - storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken) - .buildClient(); + dataLakeServiceClientBuilder.sasToken(sasToken); } else if (accessToken != null) { final TokenCredential credential = tokenRequestContext -> Mono.just(accessToken); - - storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential) - .buildClient(); + dataLakeServiceClientBuilder.credential(credential); } else if (useManagedIdentity) { final ManagedIdentityCredential misCredential = new ManagedIdentityCredentialBuilder() .clientId(managedIdentityClientId) .build(); - storageClient = new DataLakeServiceClientBuilder() - .endpoint(endpoint) - .credential(misCredential) - .buildClient(); + dataLakeServiceClientBuilder.credential(misCredential); } else if (StringUtils.isNoneBlank(servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret)) { final ClientSecretCredential credential = new ClientSecretCredentialBuilder() .tenantId(servicePrincipalTenantId) .clientId(servicePrincipalClientId) .clientSecret(servicePrincipalClientSecret) .build(); - - storageClient = new DataLakeServiceClientBuilder() - .endpoint(endpoint) - .credential(credential) - .buildClient(); + dataLakeServiceClientBuilder.credential(credential); } else { throw new IllegalArgumentException("No valid credentials were provided"); } + final NettyAsyncHttpClientBuilder nettyClientBuilder = new NettyAsyncHttpClientBuilder(); + nettyClientBuilder.proxy(getProxyOptions(context)); + + final HttpClient nettyClient = nettyClientBuilder.build(); + dataLakeServiceClientBuilder.httpClient(nettyClient); + + final DataLakeServiceClient storageClient = dataLakeServiceClientBuilder.buildClient(); + return storageClient; } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java index a6b056feb5..1ba5f7c7d7 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java @@ -40,8 +40,8 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; -@Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) -@SeeAlso({ ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class}) +@Tags({"azure", "microsoft", "cloud", "storage", "blob"}) +@SeeAlso({ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class}) @CapabilityDescription("Deletes the specified blob from Azure Blob Storage. The processor uses Azure Blob Storage client library v12.") @InputRequirement(Requirement.INPUT_REQUIRED) public class DeleteAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 { @@ -66,11 +66,12 @@ public class DeleteAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 { STORAGE_CREDENTIALS_SERVICE, AzureStorageUtils.CONTAINER, BLOB_NAME, - DELETE_SNAPSHOTS_OPTION + DELETE_SNAPSHOTS_OPTION, + AzureStorageUtils.PROXY_CONFIGURATION_SERVICE )); @Override - public List getSupportedPropertyDescriptors() { + protected List getSupportedPropertyDescriptors() { return PROPERTIES; } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java index b929971e8e..cf1ef18f77 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java @@ -36,6 +36,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import java.time.Duration; import java.util.Arrays; @@ -77,7 +78,8 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc FILESYSTEM, FILESYSTEM_OBJECT_TYPE, DIRECTORY, - FILE + FILE, + AzureStorageUtils.PROXY_CONFIGURATION_SERVICE )); @Override diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java index a12a8110e3..ac6dcae8b3 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java @@ -63,11 +63,11 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_PRIMARY_URI; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_TIMESTAMP; -@Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) +@Tags({"azure", "microsoft", "cloud", "storage", "blob"}) @CapabilityDescription("Retrieves the specified blob from Azure Blob Storage and writes its content to the content of the FlowFile. The processor uses Azure Blob Storage client library v12.") -@SeeAlso({ ListAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class }) +@SeeAlso({ListAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class}) @InputRequirement(Requirement.INPUT_REQUIRED) -@WritesAttributes({ @WritesAttribute(attribute = ATTR_NAME_CONTAINER, description = ATTR_DESCRIPTION_CONTAINER), +@WritesAttributes({@WritesAttribute(attribute = ATTR_NAME_CONTAINER, description = ATTR_DESCRIPTION_CONTAINER), @WritesAttribute(attribute = ATTR_NAME_BLOBNAME, description = ATTR_DESCRIPTION_BLOBNAME), @WritesAttribute(attribute = ATTR_NAME_PRIMARY_URI, description = ATTR_DESCRIPTION_PRIMARY_URI), @WritesAttribute(attribute = ATTR_NAME_ETAG, description = ATTR_DESCRIPTION_ETAG), @@ -75,7 +75,7 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR @WritesAttribute(attribute = ATTR_NAME_MIME_TYPE, description = ATTR_DESCRIPTION_MIME_TYPE), @WritesAttribute(attribute = ATTR_NAME_LANG, description = ATTR_DESCRIPTION_LANG), @WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP), - @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH) }) + @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)}) public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 { public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder() @@ -113,11 +113,12 @@ public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 { CONTAINER, BLOB_NAME, RANGE_START, - RANGE_LENGTH + RANGE_LENGTH, + AzureStorageUtils.PROXY_CONFIGURATION_SERVICE )); @Override - public List getSupportedPropertyDescriptors() { + protected List getSupportedPropertyDescriptors() { return PROPERTIES; } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java index 0e98858e16..a8c41e836b 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java @@ -37,8 +37,10 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; -import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -78,13 +80,20 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce .defaultValue("0") .build(); + private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + ADLS_CREDENTIALS_SERVICE, + FILESYSTEM, + DIRECTORY, + FILE, + RANGE_START, + RANGE_LENGTH, + NUM_RETRIES, + AzureStorageUtils.PROXY_CONFIGURATION_SERVICE + )); + @Override public List getSupportedPropertyDescriptors() { - List properties = new ArrayList(super.getSupportedPropertyDescriptors()); - properties.add(RANGE_START); - properties.add(RANGE_LENGTH); - properties.add(NUM_RETRIES); - return properties; + return PROPERTIES; } @Override diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java index 483a3c9314..9da7375ad4 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java @@ -141,7 +141,8 @@ public class ListAzureBlobStorage_v12 extends AbstractListAzureProcessor LISTING_RESET_PROPERTIES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( ADLS_CREDENTIALS_SERVICE, diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java index cb3d8b6669..c5b38becf6 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java @@ -36,6 +36,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import java.util.Arrays; import java.util.Collections; @@ -131,7 +132,8 @@ public class MoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageProces DESTINATION_FILESYSTEM, DESTINATION_DIRECTORY, FILE, - CONFLICT_RESOLUTION + CONFLICT_RESOLUTION, + AzureStorageUtils.PROXY_CONFIGURATION_SERVICE )); @Override diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java index 85fb00b29e..805832ecd8 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java @@ -63,11 +63,11 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_PRIMARY_URI; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_TIMESTAMP; -@Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) -@SeeAlso({ ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class }) +@Tags({"azure", "microsoft", "cloud", "storage", "blob"}) +@SeeAlso({ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class}) @CapabilityDescription("Puts content into a blob on Azure Blob Storage. The processor uses Azure Blob Storage client library v12.") @InputRequirement(Requirement.INPUT_REQUIRED) -@WritesAttributes({ @WritesAttribute(attribute = ATTR_NAME_CONTAINER, description = ATTR_DESCRIPTION_CONTAINER), +@WritesAttributes({@WritesAttribute(attribute = ATTR_NAME_CONTAINER, description = ATTR_DESCRIPTION_CONTAINER), @WritesAttribute(attribute = ATTR_NAME_BLOBNAME, description = ATTR_DESCRIPTION_BLOBNAME), @WritesAttribute(attribute = ATTR_NAME_PRIMARY_URI, description = ATTR_DESCRIPTION_PRIMARY_URI), @WritesAttribute(attribute = ATTR_NAME_ETAG, description = ATTR_DESCRIPTION_ETAG), @@ -75,7 +75,7 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR @WritesAttribute(attribute = ATTR_NAME_MIME_TYPE, description = ATTR_DESCRIPTION_MIME_TYPE), @WritesAttribute(attribute = ATTR_NAME_LANG, description = ATTR_DESCRIPTION_LANG), @WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP), - @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH) }) + @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)}) public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 { public static final PropertyDescriptor CREATE_CONTAINER = new PropertyDescriptor.Builder() @@ -87,19 +87,20 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 { .allowableValues("true", "false") .defaultValue("false") .description("Specifies whether to check if the container exists and to automatically create it if it does not. " + - "Permission to list containers is required. If false, this check is not made, but the Put operation " + - "will fail if the container does not exist.") + "Permission to list containers is required. If false, this check is not made, but the Put operation " + + "will fail if the container does not exist.") .build(); private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( STORAGE_CREDENTIALS_SERVICE, AzureStorageUtils.CONTAINER, CREATE_CONTAINER, - BLOB_NAME + BLOB_NAME, + AzureStorageUtils.PROXY_CONFIGURATION_SERVICE )); @Override - public List getSupportedPropertyDescriptors() { + protected List getSupportedPropertyDescriptors() { return PROPERTIES; } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java index 9ecbbcc032..3c10d068b0 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java @@ -33,13 +33,13 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import java.io.BufferedInputStream; import java.io.InputStream; -import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -83,18 +83,18 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess .allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, IGNORE_RESOLUTION) .build(); - private List properties; - - @Override - protected void init(final ProcessorInitializationContext context) { - final List props = new ArrayList<>(super.getSupportedPropertyDescriptors()); - props.add(CONFLICT_RESOLUTION); - properties = Collections.unmodifiableList(props); - } + private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + ADLS_CREDENTIALS_SERVICE, + FILESYSTEM, + DIRECTORY, + FILE, + CONFLICT_RESOLUTION, + AzureStorageUtils.PROXY_CONFIGURATION_SERVICE + )); @Override protected List getSupportedPropertyDescriptors() { - return properties; + return PROPERTIES; } @Override diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java index a73c108acc..654db1469f 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java @@ -16,6 +16,9 @@ */ package org.apache.nifi.processors.azure.storage.utils; +import com.azure.core.http.ProxyOptions; +import java.net.InetSocketAddress; +import java.net.Proxy; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -43,9 +46,11 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.proxy.ProxySpec; +import org.apache.nifi.proxy.SocksVersion; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService; import org.apache.nifi.services.azure.storage.AzureStorageEmulatorCredentialsDetails; +import reactor.netty.http.client.HttpClient; public final class AzureStorageUtils { public static final String BLOCK = "Block"; @@ -311,4 +316,43 @@ public final class AzureStorageUtils { final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(processContext); operationContext.setProxy(proxyConfig.createProxy()); } + + /** + * + * Creates the {@link ProxyOptions proxy options} that {@link HttpClient} will use. + * + * @param propertyContext is sed to supply Proxy configurations + * @return {@link ProxyOptions proxy options}, null if Proxy is not set + */ + public static ProxyOptions getProxyOptions(final PropertyContext propertyContext) { + final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(propertyContext); + + if (proxyConfiguration != ProxyConfiguration.DIRECT_CONFIGURATION) { + + final ProxyOptions proxyOptions = new ProxyOptions( + getProxyType(proxyConfiguration), + new InetSocketAddress(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort())); + + final String proxyUserName = proxyConfiguration.getProxyUserName(); + final String proxyUserPassword = proxyConfiguration.getProxyUserPassword(); + if (proxyUserName != null && proxyUserPassword != null) { + proxyOptions.setCredentials(proxyUserName, proxyUserPassword); + } + + return proxyOptions; + } + + return null; + } + + private static ProxyOptions.Type getProxyType(ProxyConfiguration proxyConfiguration) { + if (proxyConfiguration.getProxyType() == Proxy.Type.HTTP) { + return ProxyOptions.Type.HTTP; + } else if (proxyConfiguration.getProxyType() == Proxy.Type.SOCKS) { + final SocksVersion socksVersion = proxyConfiguration.getSocksVersion(); + return ProxyOptions.Type.valueOf(socksVersion.name()); + } else { + throw new IllegalArgumentException("Unsupported proxy type: " + proxyConfiguration.getProxyType()); + } + } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java index 6dc3bd873a..ab50166c63 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java @@ -21,12 +21,13 @@ import com.microsoft.azure.storage.StorageCredentials; import com.microsoft.azure.storage.StorageCredentialsAccountAndKey; import org.apache.nifi.processor.Processor; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.proxy.StandardProxyConfigurationService; +import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.file.FileUtils; - import org.junit.jupiter.api.BeforeEach; import java.io.FileInputStream; @@ -36,32 +37,66 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; public abstract class AbstractAzureStorageIT { - private static final Properties CONFIG; + private static final Properties CREDENTIALS_CONFIG; + private static final Properties PROXY_CONFIG; private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES"; + private static final String PROXY_CONFIGURATION_FILE = System.getProperty("user.home") + "/proxy-configuration.PROPERTIES"; static { - CONFIG = new Properties(); + CREDENTIALS_CONFIG = loadConfig(CREDENTIALS_FILE); + PROXY_CONFIG = loadConfig(PROXY_CONFIGURATION_FILE); + } + + private static Properties loadConfig(String configPath) { + Properties loadedProperties = new Properties(); + assertDoesNotThrow(() -> { - final FileInputStream fis = new FileInputStream(CREDENTIALS_FILE); - assertDoesNotThrow(() -> CONFIG.load(fis)); - FileUtils.closeQuietly(fis); + final FileInputStream fIS = new FileInputStream(configPath); + assertDoesNotThrow(() -> loadedProperties.load(fIS)); + FileUtils.closeQuietly(fIS); }); + + return loadedProperties; } protected String getAccountName() { - return CONFIG.getProperty("accountName"); + return CREDENTIALS_CONFIG.getProperty("accountName"); } protected String getAccountKey() { - return CONFIG.getProperty("accountKey"); + return CREDENTIALS_CONFIG.getProperty("accountKey"); } protected String getEndpointSuffix() { - String endpointSuffix = CONFIG.getProperty("endpointSuffix"); + String endpointSuffix = CREDENTIALS_CONFIG.getProperty("endpointSuffix"); return endpointSuffix != null ? endpointSuffix : getDefaultEndpointSuffix(); } + protected String getProxyType() { + return PROXY_CONFIG.getProperty("proxyType"); + } + + protected String getSocksVersion() { + return PROXY_CONFIG.getProperty("socksVersion"); + } + + protected String getProxyServerHost() { + return PROXY_CONFIG.getProperty("proxyServerHost"); + } + + protected String getProxyServerPort() { + return PROXY_CONFIG.getProperty("proxyServerPort"); + } + + protected String getProxyUsername() { + return PROXY_CONFIG.getProperty("proxyUsername"); + } + + protected String getProxyUserPassword() { + return PROXY_CONFIG.getProperty("proxyUserPassword"); + } + protected abstract String getDefaultEndpointSuffix(); protected TestRunner runner; @@ -102,4 +137,22 @@ public abstract class AbstractAzureStorageIT { runner.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, credentialsService.getIdentifier()); } + + protected void configureProxyService() throws InitializationException { + final StandardProxyConfigurationService proxyConfigurationService = new StandardProxyConfigurationService(); + runner.addControllerService("proxy-configuration-service", proxyConfigurationService); + + runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.PROXY_TYPE, getProxyType()); + runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.SOCKS_VERSION, getSocksVersion()); + runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.PROXY_SERVER_HOST, getProxyServerHost()); + runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.PROXY_SERVER_PORT, getProxyServerPort()); + runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.PROXY_USER_NAME, getProxyUsername()); + runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.PROXY_USER_PASSWORD, getProxyUserPassword()); + + runner.assertValid(proxyConfigurationService); + + runner.enableControllerService(proxyConfigurationService); + + runner.setProperty(AzureStorageUtils.PROXY_CONFIGURATION_SERVICE, proxyConfigurationService.getIdentifier()); + } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage_v12.java index 50a8d0bc52..2d7e4f13c4 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage_v12.java @@ -56,6 +56,17 @@ public class ITDeleteAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT assertSuccess(BLOB_NAME); } + @Test + public void testDeleteBlobWithSimpleNameUsingProxyConfigurationService() throws Exception { + uploadBlob(BLOB_NAME, BLOB_DATA); + + configureProxyService(); + + runProcessor(); + + assertSuccess(BLOB_NAME); + } + @Test public void testDeleteBlobWithCompoundName() throws Exception { String blobName = "dir1/dir2/blob1"; diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java index ac6c75a29d..0c7864d963 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java @@ -22,6 +22,7 @@ import com.azure.storage.file.datalake.models.DataLakeStorageException; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.junit.jupiter.api.Test; @@ -57,6 +58,23 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT testSuccessfulDelete(fileSystemName, directory, null, inputFlowFileContent, inputFlowFileContent); } + @Test + public void testDeleteDirectoryWithFilesUsingProxyConfigurationService() throws InitializationException { + // GIVEN + String directory = "TestDirectory"; + String filename = "testFile.txt"; + String fileContent = "AzureFileContent"; + String inputFlowFileContent = "InputFlowFileContent"; + + createDirectoryAndUploadFile(directory, filename, fileContent); + + configureProxyService(); + + // WHEN + // THEN + testSuccessfulDelete(fileSystemName, directory, null, inputFlowFileContent, inputFlowFileContent); + } + @Test public void testDeleteEmptyDirectoryWithFSTypeDirectory() { // GIVEN diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java index 64f2de3c04..4218b6c726 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java @@ -53,6 +53,17 @@ public class ITFetchAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT assertSuccess(BLOB_NAME, BLOB_DATA); } + @Test + public void testFetchBlobWithSimpleNameUsingProxyConfigurationService() throws Exception { + uploadBlob(BLOB_NAME, BLOB_DATA); + + configureProxyService(); + + runProcessor(); + + assertSuccess(BLOB_NAME, BLOB_DATA); + } + @Test public void testFetchBlobWithCompoundName() throws Exception { String blobName = "dir1/dir2/blob1"; diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java index 1b665d4109..6d50691c6f 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java @@ -21,6 +21,7 @@ import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.junit.jupiter.api.Test; @@ -56,6 +57,22 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT); } + @Test + public void testFetchFileFromDirectoryUsingProxyConfigurationService() throws InitializationException { + // GIVEN + String directory = "TestDirectory"; + String filename = "testFile.txt"; + String inputFlowFileContent = "InputFlowFileContent"; + + createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT); + + configureProxyService(); + + // WHEN + // THEN + testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT); + } + @Test public void testFetchFileFromRoot() { // GIVEN @@ -345,7 +362,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT } @Test - public void testFetchWithRangeZeroOne() throws Exception { + public void testFetchWithRangeZeroOne() { // GIVEN String directory= "A Test Directory"; String filename = "testFile.txt"; @@ -359,7 +376,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT } @Test - public void testFetchWithRangeOneOne() throws Exception { + public void testFetchWithRangeOneOne() { // GIVEN String directory= "A Test Directory"; String filename = "testFile.txt"; @@ -373,7 +390,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT } @Test - public void testFetchWithRangeTwentyThreeTwentySix() throws Exception { + public void testFetchWithRangeTwentyThreeTwentySix() { // GIVEN String directory= "A Test Directory"; String filename = "testFile.txt"; @@ -387,7 +404,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT } @Test - public void testFetchWithRangeLengthGreater() throws Exception { + public void testFetchWithRangeLengthGreater() { // GIVEN String directory= "A Test Directory"; String filename = "testFile.txt"; @@ -401,7 +418,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT } @Test - public void testFetchWithRangeLengthUnset() throws Exception { + public void testFetchWithRangeLengthUnset() { // GIVEN String directory= "A Test Directory"; String filename = "testFile.txt"; @@ -415,7 +432,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT } @Test - public void testFetchWithRangeStartOutOfRange() throws Exception { + public void testFetchWithRangeStartOutOfRange() { // GIVEN String directory= "A Test Directory"; String filename = "testFile.txt"; diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java index 942e834074..5655cf622f 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java @@ -55,6 +55,17 @@ public class ITListAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT { assertSuccess(BLOB_NAME_1, BLOB_NAME_2, BLOB_NAME_3, BLOB_NAME_4); } + @Test + public void testListBlobsUsingProxyConfigurationService() throws Exception { + uploadBlobs(); + + configureProxyService(); + + runProcessor(); + + assertSuccess(BLOB_NAME_1, BLOB_NAME_2, BLOB_NAME_3, BLOB_NAME_4); + } + @Test public void testListBlobsWithPrefix_1() throws Exception { uploadBlobs(); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java index 09ca9b2cfe..6c45663e33 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java @@ -89,6 +89,16 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21"); } + @Test + public void testListRootRecursiveUsingProxyConfigurationService() throws Exception { + runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + configureProxyService(); + + runProcessor(); + + assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21"); + } + @Test public void testListRootNonRecursive() throws Exception { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); @@ -287,7 +297,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { } } - private void assertFlowFile(TestFile testFile, MockFlowFile flowFile) throws Exception { + private void assertFlowFile(TestFile testFile, MockFlowFile flowFile) { flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName); flowFile.assertAttributeEquals(ATTR_NAME_FILE_PATH, testFile.getFilePath()); flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, testFile.getDirectory()); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java index b98317f733..69ed942d3e 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java @@ -81,6 +81,17 @@ public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA); } + @Test + public void testMoveFileToExistingDirectoryUsingProxyConfigurationService() throws Exception { + createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA); + createDirectory(DESTINATION_DIRECTORY); + configureProxyService(); + + runProcessor(FILE_DATA); + + assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA); + } + @Test public void testMoveFileToExistingDirectoryWithReplaceResolution() throws Exception { createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java index 701df85f0e..762012246f 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java @@ -54,6 +54,15 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT { assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA); } + @Test + public void testPutBlobWithSimpleNameUsingProxyConfigurationService() throws Exception { + configureProxyService(); + + runProcessor(BLOB_DATA); + + assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA); + } + @Test public void testPutBlobWithCompoundName() throws Exception { String blobName = "dir1/dir2/blob1"; diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java index 18f4341e8b..20efbb4372 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java @@ -39,9 +39,9 @@ import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doThrow; @@ -78,6 +78,16 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); } + @Test + public void testPutFileToExistingDirectoryUsingProxyConfigurationService() throws Exception { + fileSystemClient.createDirectory(DIRECTORY); + configureProxyService(); + + runProcessor(FILE_DATA); + + assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); + } + @Test public void testPutFileToExistingDirectoryWithReplaceResolution() throws Exception { fileSystemClient.createDirectory(DIRECTORY); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/ProxyConfiguration.java b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/ProxyConfiguration.java index e6d498c20c..0b9086275c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/ProxyConfiguration.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/ProxyConfiguration.java @@ -50,6 +50,10 @@ public class ProxyConfiguration { description.append(" Supported proxies: "); description.append(specs.stream().map(ProxySpec::getDisplayName).collect(Collectors.joining(", "))); + if (specs.contains(SOCKS)) { + description.append(" In case of SOCKS, it is not guaranteed that the selected SOCKS Version will be used by the processor."); + } + return new PropertyDescriptor.Builder() .fromPropertyDescriptor(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE) .description(description.toString()) @@ -62,7 +66,7 @@ public class ProxyConfiguration { * @return sorted unique specs */ private static Set getUniqueProxySpecs(ProxySpec ... _specs) { - final Set specs = Arrays.stream(_specs).sorted().collect(Collectors.toSet()); + final Set specs = Arrays.stream(_specs).collect(Collectors.toSet()); if (specs.contains(HTTP_AUTH)) { specs.remove(HTTP); } @@ -148,6 +152,7 @@ public class ProxyConfiguration { } private Proxy.Type proxyType = Proxy.Type.DIRECT; + private SocksVersion socksVersion; private String proxyServerHost; private Integer proxyServerPort; private String proxyUserName; @@ -161,6 +166,14 @@ public class ProxyConfiguration { this.proxyType = proxyType; } + public SocksVersion getSocksVersion() { + return socksVersion; + } + + public void setSocksVersion(SocksVersion socksVersion) { + this.socksVersion = socksVersion; + } + public String getProxyServerHost() { return proxyServerHost; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/SocksVersion.java b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/SocksVersion.java new file mode 100644 index 0000000000..27faa48861 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/SocksVersion.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.proxy; + +public enum SocksVersion { + SOCKS4, + SOCKS5 +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration/src/main/java/org/apache/nifi/proxy/StandardProxyConfigurationService.java b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration/src/main/java/org/apache/nifi/proxy/StandardProxyConfigurationService.java index a3b3f516b3..28d49026fb 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration/src/main/java/org/apache/nifi/proxy/StandardProxyConfigurationService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration/src/main/java/org/apache/nifi/proxy/StandardProxyConfigurationService.java @@ -37,7 +37,7 @@ import java.util.List; @Tags({"Proxy"}) public class StandardProxyConfigurationService extends AbstractControllerService implements ProxyConfigurationService { - static final PropertyDescriptor PROXY_TYPE = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROXY_TYPE = new PropertyDescriptor.Builder() .name("proxy-type") .displayName("Proxy Type") .description("Proxy type.") @@ -46,7 +46,17 @@ public class StandardProxyConfigurationService extends AbstractControllerService .required(true) .build(); - static final PropertyDescriptor PROXY_SERVER_HOST = new PropertyDescriptor.Builder() + public static final PropertyDescriptor SOCKS_VERSION = new PropertyDescriptor.Builder() + .name("socks-version") + .displayName("SOCKS Version") + .description("SOCKS Protocol Version") + .allowableValues(SocksVersion.values()) + .defaultValue(SocksVersion.SOCKS5.name()) + .dependsOn(PROXY_TYPE, Proxy.Type.SOCKS.name()) + .required(true) + .build(); + + public static final PropertyDescriptor PROXY_SERVER_HOST = new PropertyDescriptor.Builder() .name("proxy-server-host") .displayName("Proxy Server Host") .description("Proxy server hostname or ip-address.") @@ -54,7 +64,7 @@ public class StandardProxyConfigurationService extends AbstractControllerService .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); - static final PropertyDescriptor PROXY_SERVER_PORT = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROXY_SERVER_PORT = new PropertyDescriptor.Builder() .name("proxy-server-port") .displayName("Proxy Server Port") .description("Proxy server port number.") @@ -62,7 +72,7 @@ public class StandardProxyConfigurationService extends AbstractControllerService .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); - static final PropertyDescriptor PROXY_USER_NAME = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROXY_USER_NAME = new PropertyDescriptor.Builder() .name("proxy-user-name") .displayName("Proxy User Name") .description("The name of the proxy client for user authentication.") @@ -70,7 +80,7 @@ public class StandardProxyConfigurationService extends AbstractControllerService .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); - static final PropertyDescriptor PROXY_USER_PASSWORD = new PropertyDescriptor.Builder() + public static final PropertyDescriptor PROXY_USER_PASSWORD = new PropertyDescriptor.Builder() .name("proxy-user-password") .displayName("Proxy User Password") .description("The password of the proxy client for user authentication.") @@ -85,6 +95,7 @@ public class StandardProxyConfigurationService extends AbstractControllerService protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(); properties.add(PROXY_TYPE); + properties.add(SOCKS_VERSION); properties.add(PROXY_SERVER_HOST); properties.add(PROXY_SERVER_PORT); properties.add(PROXY_USER_NAME); @@ -95,7 +106,10 @@ public class StandardProxyConfigurationService extends AbstractControllerService @OnEnabled public void setConfiguredValues(final ConfigurationContext context) { configuration = new ProxyConfiguration(); - configuration.setProxyType(Proxy.Type.valueOf(context.getProperty(PROXY_TYPE).getValue())); + + final Proxy.Type proxyType = Proxy.Type.valueOf(context.getProperty(PROXY_TYPE).getValue()); + configuration.setProxyType(proxyType); + configuration.setSocksVersion(proxyType == Proxy.Type.SOCKS ? SocksVersion.valueOf(context.getProperty(SOCKS_VERSION).getValue()) : null); configuration.setProxyServerHost(context.getProperty(PROXY_SERVER_HOST).evaluateAttributeExpressions().getValue()); configuration.setProxyServerPort(context.getProperty(PROXY_SERVER_PORT).evaluateAttributeExpressions().asInteger()); configuration.setProxyUserName(context.getProperty(PROXY_USER_NAME).evaluateAttributeExpressions().getValue());