NIFI-9951 Add proxy support to Azure ADLS and Blob v12 processors

This closes #5990.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Nandor Soma Abonyi 2022-04-20 15:05:05 +02:00 committed by Peter Turcsanyi
parent e5203db960
commit f66540eb6d
26 changed files with 377 additions and 100 deletions

View File

@ -188,6 +188,12 @@
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration</artifactId>
<version>1.17.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -18,6 +18,8 @@ package org.apache.nifi.processors.azure;
import com.azure.core.credential.AzureSasCredential;
import com.azure.core.credential.TokenCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.storage.blob.BlobClient;
@ -45,6 +47,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBTYPE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER;
@ -110,12 +113,19 @@ public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor {
}
public static BlobServiceClient createStorageClient(PropertyContext context) {
AzureStorageCredentialsService_v12 credentialsService = context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class);
AzureStorageCredentialsDetails_v12 credentialsDetails = credentialsService.getCredentialsDetails();
final AzureStorageCredentialsService_v12 credentialsService = context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class);
final AzureStorageCredentialsDetails_v12 credentialsDetails = credentialsService.getCredentialsDetails();
BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder();
final BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder();
clientBuilder.endpoint(String.format("https://%s.%s", credentialsDetails.getAccountName(), credentialsDetails.getEndpointSuffix()));
final NettyAsyncHttpClientBuilder nettyClientBuilder = new NettyAsyncHttpClientBuilder();
nettyClientBuilder.proxy(getProxyOptions(context));
final HttpClient nettyClient = nettyClientBuilder.build();
clientBuilder.httpClient(nettyClient);
configureCredential(clientBuilder, credentialsService, credentialsDetails);
return clientBuilder.buildClient();
@ -131,7 +141,7 @@ public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor {
break;
case MANAGED_IDENTITY:
clientBuilder.credential(new ManagedIdentityCredentialBuilder()
.clientId(credentialsDetails.getManagedIdentityClientId())
.clientId(credentialsDetails.getManagedIdentityClientId())
.build());
break;
case SERVICE_PRINCIPAL:

View File

@ -18,6 +18,8 @@ package org.apache.nifi.processors.azure;
import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.identity.ManagedIdentityCredential;
@ -45,11 +47,11 @@ import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
@ -94,23 +96,11 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
"Files that could not be written to Azure storage for some reason are transferred to this relationship")
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
ADLS_CREDENTIALS_SERVICE,
FILESYSTEM,
DIRECTORY,
FILE
));
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS,
REL_FAILURE
)));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
@ -136,43 +126,41 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
final String endpoint = String.format("https://%s.%s", accountName, endpointSuffix);
final DataLakeServiceClient storageClient;
final DataLakeServiceClientBuilder dataLakeServiceClientBuilder = new DataLakeServiceClientBuilder();
dataLakeServiceClientBuilder.endpoint(endpoint);
if (StringUtils.isNotBlank(accountKey)) {
final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
accountKey);
storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
.buildClient();
final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
dataLakeServiceClientBuilder.credential(credential);
} else if (StringUtils.isNotBlank(sasToken)) {
storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
.buildClient();
dataLakeServiceClientBuilder.sasToken(sasToken);
} else if (accessToken != null) {
final TokenCredential credential = tokenRequestContext -> Mono.just(accessToken);
storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
.buildClient();
dataLakeServiceClientBuilder.credential(credential);
} else if (useManagedIdentity) {
final ManagedIdentityCredential misCredential = new ManagedIdentityCredentialBuilder()
.clientId(managedIdentityClientId)
.build();
storageClient = new DataLakeServiceClientBuilder()
.endpoint(endpoint)
.credential(misCredential)
.buildClient();
dataLakeServiceClientBuilder.credential(misCredential);
} else if (StringUtils.isNoneBlank(servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret)) {
final ClientSecretCredential credential = new ClientSecretCredentialBuilder()
.tenantId(servicePrincipalTenantId)
.clientId(servicePrincipalClientId)
.clientSecret(servicePrincipalClientSecret)
.build();
storageClient = new DataLakeServiceClientBuilder()
.endpoint(endpoint)
.credential(credential)
.buildClient();
dataLakeServiceClientBuilder.credential(credential);
} else {
throw new IllegalArgumentException("No valid credentials were provided");
}
final NettyAsyncHttpClientBuilder nettyClientBuilder = new NettyAsyncHttpClientBuilder();
nettyClientBuilder.proxy(getProxyOptions(context));
final HttpClient nettyClient = nettyClientBuilder.build();
dataLakeServiceClientBuilder.httpClient(nettyClient);
final DataLakeServiceClient storageClient = dataLakeServiceClientBuilder.buildClient();
return storageClient;
}

View File

@ -40,8 +40,8 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@SeeAlso({ ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class})
@Tags({"azure", "microsoft", "cloud", "storage", "blob"})
@SeeAlso({ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class})
@CapabilityDescription("Deletes the specified blob from Azure Blob Storage. The processor uses Azure Blob Storage client library v12.")
@InputRequirement(Requirement.INPUT_REQUIRED)
public class DeleteAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
@ -66,11 +66,12 @@ public class DeleteAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
STORAGE_CREDENTIALS_SERVICE,
AzureStorageUtils.CONTAINER,
BLOB_NAME,
DELETE_SNAPSHOTS_OPTION
DELETE_SNAPSHOTS_OPTION,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}

View File

@ -36,6 +36,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.time.Duration;
import java.util.Arrays;
@ -77,7 +78,8 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
FILESYSTEM,
FILESYSTEM_OBJECT_TYPE,
DIRECTORY,
FILE
FILE,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
@Override

View File

@ -63,11 +63,11 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_PRIMARY_URI;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_TIMESTAMP;
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@Tags({"azure", "microsoft", "cloud", "storage", "blob"})
@CapabilityDescription("Retrieves the specified blob from Azure Blob Storage and writes its content to the content of the FlowFile. The processor uses Azure Blob Storage client library v12.")
@SeeAlso({ ListAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class })
@SeeAlso({ListAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@WritesAttributes({ @WritesAttribute(attribute = ATTR_NAME_CONTAINER, description = ATTR_DESCRIPTION_CONTAINER),
@WritesAttributes({@WritesAttribute(attribute = ATTR_NAME_CONTAINER, description = ATTR_DESCRIPTION_CONTAINER),
@WritesAttribute(attribute = ATTR_NAME_BLOBNAME, description = ATTR_DESCRIPTION_BLOBNAME),
@WritesAttribute(attribute = ATTR_NAME_PRIMARY_URI, description = ATTR_DESCRIPTION_PRIMARY_URI),
@WritesAttribute(attribute = ATTR_NAME_ETAG, description = ATTR_DESCRIPTION_ETAG),
@ -75,7 +75,7 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
@WritesAttribute(attribute = ATTR_NAME_MIME_TYPE, description = ATTR_DESCRIPTION_MIME_TYPE),
@WritesAttribute(attribute = ATTR_NAME_LANG, description = ATTR_DESCRIPTION_LANG),
@WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP),
@WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH) })
@WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder()
@ -113,11 +113,12 @@ public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
CONTAINER,
BLOB_NAME,
RANGE_START,
RANGE_LENGTH
RANGE_LENGTH,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}

View File

@ -37,8 +37,10 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -78,13 +80,20 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
.defaultValue("0")
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
ADLS_CREDENTIALS_SERVICE,
FILESYSTEM,
DIRECTORY,
FILE,
RANGE_START,
RANGE_LENGTH,
NUM_RETRIES,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>(super.getSupportedPropertyDescriptors());
properties.add(RANGE_START);
properties.add(RANGE_LENGTH);
properties.add(NUM_RETRIES);
return properties;
return PROPERTIES;
}
@Override

View File

@ -141,7 +141,8 @@ public class ListAzureBlobStorage_v12 extends AbstractListAzureProcessor<BlobInf
MIN_AGE,
MAX_AGE,
MIN_SIZE,
MAX_SIZE
MAX_SIZE,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
private BlobServiceClient storageClient;

View File

@ -42,6 +42,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.ADLSFileInfo;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
@ -143,7 +144,8 @@ public class ListAzureDataLakeStorage extends AbstractListAzureProcessor<ADLSFil
MIN_AGE,
MAX_AGE,
MIN_SIZE,
MAX_SIZE));
MAX_SIZE,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
private static final Set<PropertyDescriptor> LISTING_RESET_PROPERTIES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
ADLS_CREDENTIALS_SERVICE,

View File

@ -36,6 +36,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.util.Arrays;
import java.util.Collections;
@ -131,7 +132,8 @@ public class MoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageProces
DESTINATION_FILESYSTEM,
DESTINATION_DIRECTORY,
FILE,
CONFLICT_RESOLUTION
CONFLICT_RESOLUTION,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
@Override

View File

@ -63,11 +63,11 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_PRIMARY_URI;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_TIMESTAMP;
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@SeeAlso({ ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class })
@Tags({"azure", "microsoft", "cloud", "storage", "blob"})
@SeeAlso({ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class})
@CapabilityDescription("Puts content into a blob on Azure Blob Storage. The processor uses Azure Blob Storage client library v12.")
@InputRequirement(Requirement.INPUT_REQUIRED)
@WritesAttributes({ @WritesAttribute(attribute = ATTR_NAME_CONTAINER, description = ATTR_DESCRIPTION_CONTAINER),
@WritesAttributes({@WritesAttribute(attribute = ATTR_NAME_CONTAINER, description = ATTR_DESCRIPTION_CONTAINER),
@WritesAttribute(attribute = ATTR_NAME_BLOBNAME, description = ATTR_DESCRIPTION_BLOBNAME),
@WritesAttribute(attribute = ATTR_NAME_PRIMARY_URI, description = ATTR_DESCRIPTION_PRIMARY_URI),
@WritesAttribute(attribute = ATTR_NAME_ETAG, description = ATTR_DESCRIPTION_ETAG),
@ -75,7 +75,7 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
@WritesAttribute(attribute = ATTR_NAME_MIME_TYPE, description = ATTR_DESCRIPTION_MIME_TYPE),
@WritesAttribute(attribute = ATTR_NAME_LANG, description = ATTR_DESCRIPTION_LANG),
@WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP),
@WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH) })
@WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
public static final PropertyDescriptor CREATE_CONTAINER = new PropertyDescriptor.Builder()
@ -87,19 +87,20 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
.allowableValues("true", "false")
.defaultValue("false")
.description("Specifies whether to check if the container exists and to automatically create it if it does not. " +
"Permission to list containers is required. If false, this check is not made, but the Put operation " +
"will fail if the container does not exist.")
"Permission to list containers is required. If false, this check is not made, but the Put operation " +
"will fail if the container does not exist.")
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
STORAGE_CREDENTIALS_SERVICE,
AzureStorageUtils.CONTAINER,
CREATE_CONTAINER,
BLOB_NAME
BLOB_NAME,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}

View File

@ -33,13 +33,13 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.io.BufferedInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -83,18 +83,18 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
.allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, IGNORE_RESOLUTION)
.build();
private List<PropertyDescriptor> properties;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> props = new ArrayList<>(super.getSupportedPropertyDescriptors());
props.add(CONFLICT_RESOLUTION);
properties = Collections.unmodifiableList(props);
}
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
ADLS_CREDENTIALS_SERVICE,
FILESYSTEM,
DIRECTORY,
FILE,
CONFLICT_RESOLUTION,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
return PROPERTIES;
}
@Override

View File

@ -16,6 +16,9 @@
*/
package org.apache.nifi.processors.azure.storage.utils;
import com.azure.core.http.ProxyOptions;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@ -43,9 +46,11 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.proxy.SocksVersion;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService;
import org.apache.nifi.services.azure.storage.AzureStorageEmulatorCredentialsDetails;
import reactor.netty.http.client.HttpClient;
public final class AzureStorageUtils {
public static final String BLOCK = "Block";
@ -311,4 +316,43 @@ public final class AzureStorageUtils {
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(processContext);
operationContext.setProxy(proxyConfig.createProxy());
}
/**
*
* Creates the {@link ProxyOptions proxy options} that {@link HttpClient} will use.
*
* @param propertyContext is sed to supply Proxy configurations
* @return {@link ProxyOptions proxy options}, null if Proxy is not set
*/
public static ProxyOptions getProxyOptions(final PropertyContext propertyContext) {
final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(propertyContext);
if (proxyConfiguration != ProxyConfiguration.DIRECT_CONFIGURATION) {
final ProxyOptions proxyOptions = new ProxyOptions(
getProxyType(proxyConfiguration),
new InetSocketAddress(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort()));
final String proxyUserName = proxyConfiguration.getProxyUserName();
final String proxyUserPassword = proxyConfiguration.getProxyUserPassword();
if (proxyUserName != null && proxyUserPassword != null) {
proxyOptions.setCredentials(proxyUserName, proxyUserPassword);
}
return proxyOptions;
}
return null;
}
private static ProxyOptions.Type getProxyType(ProxyConfiguration proxyConfiguration) {
if (proxyConfiguration.getProxyType() == Proxy.Type.HTTP) {
return ProxyOptions.Type.HTTP;
} else if (proxyConfiguration.getProxyType() == Proxy.Type.SOCKS) {
final SocksVersion socksVersion = proxyConfiguration.getSocksVersion();
return ProxyOptions.Type.valueOf(socksVersion.name());
} else {
throw new IllegalArgumentException("Unsupported proxy type: " + proxyConfiguration.getProxyType());
}
}
}

View File

@ -21,12 +21,13 @@ import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.proxy.StandardProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
import org.junit.jupiter.api.BeforeEach;
import java.io.FileInputStream;
@ -36,32 +37,66 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
public abstract class AbstractAzureStorageIT {
private static final Properties CONFIG;
private static final Properties CREDENTIALS_CONFIG;
private static final Properties PROXY_CONFIG;
private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
private static final String PROXY_CONFIGURATION_FILE = System.getProperty("user.home") + "/proxy-configuration.PROPERTIES";
static {
CONFIG = new Properties();
CREDENTIALS_CONFIG = loadConfig(CREDENTIALS_FILE);
PROXY_CONFIG = loadConfig(PROXY_CONFIGURATION_FILE);
}
private static Properties loadConfig(String configPath) {
Properties loadedProperties = new Properties();
assertDoesNotThrow(() -> {
final FileInputStream fis = new FileInputStream(CREDENTIALS_FILE);
assertDoesNotThrow(() -> CONFIG.load(fis));
FileUtils.closeQuietly(fis);
final FileInputStream fIS = new FileInputStream(configPath);
assertDoesNotThrow(() -> loadedProperties.load(fIS));
FileUtils.closeQuietly(fIS);
});
return loadedProperties;
}
protected String getAccountName() {
return CONFIG.getProperty("accountName");
return CREDENTIALS_CONFIG.getProperty("accountName");
}
protected String getAccountKey() {
return CONFIG.getProperty("accountKey");
return CREDENTIALS_CONFIG.getProperty("accountKey");
}
protected String getEndpointSuffix() {
String endpointSuffix = CONFIG.getProperty("endpointSuffix");
String endpointSuffix = CREDENTIALS_CONFIG.getProperty("endpointSuffix");
return endpointSuffix != null ? endpointSuffix : getDefaultEndpointSuffix();
}
protected String getProxyType() {
return PROXY_CONFIG.getProperty("proxyType");
}
protected String getSocksVersion() {
return PROXY_CONFIG.getProperty("socksVersion");
}
protected String getProxyServerHost() {
return PROXY_CONFIG.getProperty("proxyServerHost");
}
protected String getProxyServerPort() {
return PROXY_CONFIG.getProperty("proxyServerPort");
}
protected String getProxyUsername() {
return PROXY_CONFIG.getProperty("proxyUsername");
}
protected String getProxyUserPassword() {
return PROXY_CONFIG.getProperty("proxyUserPassword");
}
protected abstract String getDefaultEndpointSuffix();
protected TestRunner runner;
@ -102,4 +137,22 @@ public abstract class AbstractAzureStorageIT {
runner.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, credentialsService.getIdentifier());
}
protected void configureProxyService() throws InitializationException {
final StandardProxyConfigurationService proxyConfigurationService = new StandardProxyConfigurationService();
runner.addControllerService("proxy-configuration-service", proxyConfigurationService);
runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.PROXY_TYPE, getProxyType());
runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.SOCKS_VERSION, getSocksVersion());
runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.PROXY_SERVER_HOST, getProxyServerHost());
runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.PROXY_SERVER_PORT, getProxyServerPort());
runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.PROXY_USER_NAME, getProxyUsername());
runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.PROXY_USER_PASSWORD, getProxyUserPassword());
runner.assertValid(proxyConfigurationService);
runner.enableControllerService(proxyConfigurationService);
runner.setProperty(AzureStorageUtils.PROXY_CONFIGURATION_SERVICE, proxyConfigurationService.getIdentifier());
}
}

View File

@ -56,6 +56,17 @@ public class ITDeleteAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT
assertSuccess(BLOB_NAME);
}
@Test
public void testDeleteBlobWithSimpleNameUsingProxyConfigurationService() throws Exception {
uploadBlob(BLOB_NAME, BLOB_DATA);
configureProxyService();
runProcessor();
assertSuccess(BLOB_NAME);
}
@Test
public void testDeleteBlobWithCompoundName() throws Exception {
String blobName = "dir1/dir2/blob1";

View File

@ -22,6 +22,7 @@ import com.azure.storage.file.datalake.models.DataLakeStorageException;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.Test;
@ -57,6 +58,23 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
testSuccessfulDelete(fileSystemName, directory, null, inputFlowFileContent, inputFlowFileContent);
}
@Test
public void testDeleteDirectoryWithFilesUsingProxyConfigurationService() throws InitializationException {
// GIVEN
String directory = "TestDirectory";
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, fileContent);
configureProxyService();
// WHEN
// THEN
testSuccessfulDelete(fileSystemName, directory, null, inputFlowFileContent, inputFlowFileContent);
}
@Test
public void testDeleteEmptyDirectoryWithFSTypeDirectory() {
// GIVEN

View File

@ -53,6 +53,17 @@ public class ITFetchAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT
assertSuccess(BLOB_NAME, BLOB_DATA);
}
@Test
public void testFetchBlobWithSimpleNameUsingProxyConfigurationService() throws Exception {
uploadBlob(BLOB_NAME, BLOB_DATA);
configureProxyService();
runProcessor();
assertSuccess(BLOB_NAME, BLOB_DATA);
}
@Test
public void testFetchBlobWithCompoundName() throws Exception {
String blobName = "dir1/dir2/blob1";

View File

@ -21,6 +21,7 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.Test;
@ -56,6 +57,22 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT);
}
@Test
public void testFetchFileFromDirectoryUsingProxyConfigurationService() throws InitializationException {
// GIVEN
String directory = "TestDirectory";
String filename = "testFile.txt";
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
configureProxyService();
// WHEN
// THEN
testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT);
}
@Test
public void testFetchFileFromRoot() {
// GIVEN
@ -345,7 +362,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
}
@Test
public void testFetchWithRangeZeroOne() throws Exception {
public void testFetchWithRangeZeroOne() {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
@ -359,7 +376,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
}
@Test
public void testFetchWithRangeOneOne() throws Exception {
public void testFetchWithRangeOneOne() {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
@ -373,7 +390,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
}
@Test
public void testFetchWithRangeTwentyThreeTwentySix() throws Exception {
public void testFetchWithRangeTwentyThreeTwentySix() {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
@ -387,7 +404,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
}
@Test
public void testFetchWithRangeLengthGreater() throws Exception {
public void testFetchWithRangeLengthGreater() {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
@ -401,7 +418,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
}
@Test
public void testFetchWithRangeLengthUnset() throws Exception {
public void testFetchWithRangeLengthUnset() {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
@ -415,7 +432,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
}
@Test
public void testFetchWithRangeStartOutOfRange() throws Exception {
public void testFetchWithRangeStartOutOfRange() {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";

View File

@ -55,6 +55,17 @@ public class ITListAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
assertSuccess(BLOB_NAME_1, BLOB_NAME_2, BLOB_NAME_3, BLOB_NAME_4);
}
@Test
public void testListBlobsUsingProxyConfigurationService() throws Exception {
uploadBlobs();
configureProxyService();
runProcessor();
assertSuccess(BLOB_NAME_1, BLOB_NAME_2, BLOB_NAME_3, BLOB_NAME_4);
}
@Test
public void testListBlobsWithPrefix_1() throws Exception {
uploadBlobs();

View File

@ -89,6 +89,16 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21");
}
@Test
public void testListRootRecursiveUsingProxyConfigurationService() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
configureProxyService();
runProcessor();
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21");
}
@Test
public void testListRootNonRecursive() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
@ -287,7 +297,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
}
private void assertFlowFile(TestFile testFile, MockFlowFile flowFile) throws Exception {
private void assertFlowFile(TestFile testFile, MockFlowFile flowFile) {
flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName);
flowFile.assertAttributeEquals(ATTR_NAME_FILE_PATH, testFile.getFilePath());
flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, testFile.getDirectory());

View File

@ -81,6 +81,17 @@ public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
}
@Test
public void testMoveFileToExistingDirectoryUsingProxyConfigurationService() throws Exception {
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
createDirectory(DESTINATION_DIRECTORY);
configureProxyService();
runProcessor(FILE_DATA);
assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
}
@Test
public void testMoveFileToExistingDirectoryWithReplaceResolution() throws Exception {
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);

View File

@ -54,6 +54,15 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA);
}
@Test
public void testPutBlobWithSimpleNameUsingProxyConfigurationService() throws Exception {
configureProxyService();
runProcessor(BLOB_DATA);
assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA);
}
@Test
public void testPutBlobWithCompoundName() throws Exception {
String blobName = "dir1/dir2/blob1";

View File

@ -39,9 +39,9 @@ import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doThrow;
@ -78,6 +78,16 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
}
@Test
public void testPutFileToExistingDirectoryUsingProxyConfigurationService() throws Exception {
fileSystemClient.createDirectory(DIRECTORY);
configureProxyService();
runProcessor(FILE_DATA);
assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
}
@Test
public void testPutFileToExistingDirectoryWithReplaceResolution() throws Exception {
fileSystemClient.createDirectory(DIRECTORY);

View File

@ -50,6 +50,10 @@ public class ProxyConfiguration {
description.append(" Supported proxies: ");
description.append(specs.stream().map(ProxySpec::getDisplayName).collect(Collectors.joining(", ")));
if (specs.contains(SOCKS)) {
description.append(" In case of SOCKS, it is not guaranteed that the selected SOCKS Version will be used by the processor.");
}
return new PropertyDescriptor.Builder()
.fromPropertyDescriptor(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE)
.description(description.toString())
@ -62,7 +66,7 @@ public class ProxyConfiguration {
* @return sorted unique specs
*/
private static Set<ProxySpec> getUniqueProxySpecs(ProxySpec ... _specs) {
final Set<ProxySpec> specs = Arrays.stream(_specs).sorted().collect(Collectors.toSet());
final Set<ProxySpec> specs = Arrays.stream(_specs).collect(Collectors.toSet());
if (specs.contains(HTTP_AUTH)) {
specs.remove(HTTP);
}
@ -148,6 +152,7 @@ public class ProxyConfiguration {
}
private Proxy.Type proxyType = Proxy.Type.DIRECT;
private SocksVersion socksVersion;
private String proxyServerHost;
private Integer proxyServerPort;
private String proxyUserName;
@ -161,6 +166,14 @@ public class ProxyConfiguration {
this.proxyType = proxyType;
}
public SocksVersion getSocksVersion() {
return socksVersion;
}
public void setSocksVersion(SocksVersion socksVersion) {
this.socksVersion = socksVersion;
}
public String getProxyServerHost() {
return proxyServerHost;
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.proxy;
public enum SocksVersion {
SOCKS4,
SOCKS5
}

View File

@ -37,7 +37,7 @@ import java.util.List;
@Tags({"Proxy"})
public class StandardProxyConfigurationService extends AbstractControllerService implements ProxyConfigurationService {
static final PropertyDescriptor PROXY_TYPE = new PropertyDescriptor.Builder()
public static final PropertyDescriptor PROXY_TYPE = new PropertyDescriptor.Builder()
.name("proxy-type")
.displayName("Proxy Type")
.description("Proxy type.")
@ -46,7 +46,17 @@ public class StandardProxyConfigurationService extends AbstractControllerService
.required(true)
.build();
static final PropertyDescriptor PROXY_SERVER_HOST = new PropertyDescriptor.Builder()
public static final PropertyDescriptor SOCKS_VERSION = new PropertyDescriptor.Builder()
.name("socks-version")
.displayName("SOCKS Version")
.description("SOCKS Protocol Version")
.allowableValues(SocksVersion.values())
.defaultValue(SocksVersion.SOCKS5.name())
.dependsOn(PROXY_TYPE, Proxy.Type.SOCKS.name())
.required(true)
.build();
public static final PropertyDescriptor PROXY_SERVER_HOST = new PropertyDescriptor.Builder()
.name("proxy-server-host")
.displayName("Proxy Server Host")
.description("Proxy server hostname or ip-address.")
@ -54,7 +64,7 @@ public class StandardProxyConfigurationService extends AbstractControllerService
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor PROXY_SERVER_PORT = new PropertyDescriptor.Builder()
public static final PropertyDescriptor PROXY_SERVER_PORT = new PropertyDescriptor.Builder()
.name("proxy-server-port")
.displayName("Proxy Server Port")
.description("Proxy server port number.")
@ -62,7 +72,7 @@ public class StandardProxyConfigurationService extends AbstractControllerService
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor PROXY_USER_NAME = new PropertyDescriptor.Builder()
public static final PropertyDescriptor PROXY_USER_NAME = new PropertyDescriptor.Builder()
.name("proxy-user-name")
.displayName("Proxy User Name")
.description("The name of the proxy client for user authentication.")
@ -70,7 +80,7 @@ public class StandardProxyConfigurationService extends AbstractControllerService
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor PROXY_USER_PASSWORD = new PropertyDescriptor.Builder()
public static final PropertyDescriptor PROXY_USER_PASSWORD = new PropertyDescriptor.Builder()
.name("proxy-user-password")
.displayName("Proxy User Password")
.description("The password of the proxy client for user authentication.")
@ -85,6 +95,7 @@ public class StandardProxyConfigurationService extends AbstractControllerService
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(PROXY_TYPE);
properties.add(SOCKS_VERSION);
properties.add(PROXY_SERVER_HOST);
properties.add(PROXY_SERVER_PORT);
properties.add(PROXY_USER_NAME);
@ -95,7 +106,10 @@ public class StandardProxyConfigurationService extends AbstractControllerService
@OnEnabled
public void setConfiguredValues(final ConfigurationContext context) {
configuration = new ProxyConfiguration();
configuration.setProxyType(Proxy.Type.valueOf(context.getProperty(PROXY_TYPE).getValue()));
final Proxy.Type proxyType = Proxy.Type.valueOf(context.getProperty(PROXY_TYPE).getValue());
configuration.setProxyType(proxyType);
configuration.setSocksVersion(proxyType == Proxy.Type.SOCKS ? SocksVersion.valueOf(context.getProperty(SOCKS_VERSION).getValue()) : null);
configuration.setProxyServerHost(context.getProperty(PROXY_SERVER_HOST).evaluateAttributeExpressions().getValue());
configuration.setProxyServerPort(context.getProperty(PROXY_SERVER_PORT).evaluateAttributeExpressions().asInteger());
configuration.setProxyUserName(context.getProperty(PROXY_USER_NAME).evaluateAttributeExpressions().getValue());