NIFI-10152 Storage client caching in Azure ADLS processors

This closes #6158.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Nandor Soma Abonyi 2022-06-23 01:25:45 +02:00 committed by Peter Turcsanyi
parent d82ce18f88
commit 320aed024f
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
9 changed files with 310 additions and 82 deletions

View File

@ -195,6 +195,11 @@ The following binary components are provided under the Apache Software License v
Reactive Streams Netty Driver
Copyright 2020, Project Reactor
(ASLv2) Caffeine (com.github.ben-manes.caffeine:caffeine:jar:2.9.2 - https://github.com/ben-manes/caffeine)
The following NOTICE information applies:
Caffeine (caching library)
Copyright Ben Manes
************************
Common Development and Distribution License 1.0
************************

View File

@ -135,6 +135,12 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
@ -159,7 +165,6 @@
<version>1.18.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>

View File

@ -16,18 +16,10 @@
*/
package org.apache.nifi.processors.azure;
import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -40,9 +32,10 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.azure.storage.utils.DataLakeServiceClientFactory;
import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Collections;
@ -51,7 +44,6 @@ import java.util.Map;
import java.util.Set;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
@ -65,7 +57,7 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder()
.name("filesystem-name").displayName("Filesystem Name")
.description("Name of the Azure Storage File System. It is assumed to be already existing.")
.description("Name of the Azure Storage File System (also called Container). It is assumed to be already existing.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
@ -103,65 +95,31 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
public static final String TEMP_FILE_DIRECTORY = "_nifitempdirectory";
private DataLakeServiceClientFactory clientFactory;
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
public static DataLakeServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) {
@OnScheduled
public void onScheduled(final ProcessContext context) {
clientFactory = new DataLakeServiceClientFactory(getLogger(), AzureStorageUtils.getProxyOptions(context));
}
@OnStopped
public void onStopped() {
clientFactory = null;
}
public DataLakeServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) {
final Map<String, String> attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap();
final ADLSCredentialsService credentialsService = context.getProperty(ADLS_CREDENTIALS_SERVICE).asControllerService(ADLSCredentialsService.class);
final ADLSCredentialsDetails credentialsDetails = credentialsService.getCredentialsDetails(attributes);
final String accountName = credentialsDetails.getAccountName();
final String accountKey = credentialsDetails.getAccountKey();
final String sasToken = credentialsDetails.getSasToken();
final AccessToken accessToken = credentialsDetails.getAccessToken();
final String endpointSuffix = credentialsDetails.getEndpointSuffix();
final boolean useManagedIdentity = credentialsDetails.getUseManagedIdentity();
final String managedIdentityClientId = credentialsDetails.getManagedIdentityClientId();
final String servicePrincipalTenantId = credentialsDetails.getServicePrincipalTenantId();
final String servicePrincipalClientId = credentialsDetails.getServicePrincipalClientId();
final String servicePrincipalClientSecret = credentialsDetails.getServicePrincipalClientSecret();
final String endpoint = String.format("https://%s.%s", accountName, endpointSuffix);
final DataLakeServiceClientBuilder dataLakeServiceClientBuilder = new DataLakeServiceClientBuilder();
dataLakeServiceClientBuilder.endpoint(endpoint);
if (StringUtils.isNotBlank(accountKey)) {
final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
dataLakeServiceClientBuilder.credential(credential);
} else if (StringUtils.isNotBlank(sasToken)) {
dataLakeServiceClientBuilder.sasToken(sasToken);
} else if (accessToken != null) {
final TokenCredential credential = tokenRequestContext -> Mono.just(accessToken);
dataLakeServiceClientBuilder.credential(credential);
} else if (useManagedIdentity) {
final ManagedIdentityCredential misCredential = new ManagedIdentityCredentialBuilder()
.clientId(managedIdentityClientId)
.build();
dataLakeServiceClientBuilder.credential(misCredential);
} else if (StringUtils.isNoneBlank(servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret)) {
final ClientSecretCredential credential = new ClientSecretCredentialBuilder()
.tenantId(servicePrincipalTenantId)
.clientId(servicePrincipalClientId)
.clientSecret(servicePrincipalClientSecret)
.build();
dataLakeServiceClientBuilder.credential(credential);
} else {
throw new IllegalArgumentException("No valid credentials were provided");
}
final NettyAsyncHttpClientBuilder nettyClientBuilder = new NettyAsyncHttpClientBuilder();
nettyClientBuilder.proxy(getProxyOptions(context));
final HttpClient nettyClient = nettyClientBuilder.build();
dataLakeServiceClientBuilder.httpClient(nettyClient);
final DataLakeServiceClient storageClient = dataLakeServiceClientBuilder.buildClient();
final DataLakeServiceClient storageClient = clientFactory.getStorageClient(credentialsDetails);
return storageClient;
}

View File

@ -43,7 +43,10 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.ADLSFileInfo;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.azure.storage.utils.DataLakeServiceClientFactory;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
import org.apache.nifi.services.azure.storage.ADLSCredentialsService;
import java.io.IOException;
import java.util.Arrays;
@ -66,7 +69,6 @@ import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProce
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateDirectoryProperty;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateFileSystemProperty;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.getStorageClient;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_ETAG;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME;
@ -170,6 +172,8 @@ public class ListAzureDataLakeStorage extends AbstractListAzureProcessor<ADLSFil
private volatile Pattern filePattern;
private volatile Pattern pathPattern;
private DataLakeServiceClientFactory clientFactory;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
@ -179,12 +183,14 @@ public class ListAzureDataLakeStorage extends AbstractListAzureProcessor<ADLSFil
public void onScheduled(final ProcessContext context) {
filePattern = getPattern(context, FILE_FILTER);
pathPattern = getPattern(context, PATH_FILTER);
clientFactory = new DataLakeServiceClientFactory(getLogger(), AzureStorageUtils.getProxyOptions(context));
}
@OnStopped
public void onStopped() {
filePattern = null;
pathPattern = null;
clientFactory = null;
}
@Override
@ -264,7 +270,11 @@ public class ListAzureDataLakeStorage extends AbstractListAzureProcessor<ADLSFil
final Pattern filePattern = listingMode == ListingMode.EXECUTION ? this.filePattern : getPattern(context, FILE_FILTER);
final Pattern pathPattern = listingMode == ListingMode.EXECUTION ? this.pathPattern : getPattern(context, PATH_FILTER);
final DataLakeServiceClient storageClient = getStorageClient(context, null);
final ADLSCredentialsService credentialsService = context.getProperty(ADLS_CREDENTIALS_SERVICE).asControllerService(ADLSCredentialsService.class);
final ADLSCredentialsDetails credentialsDetails = credentialsService.getCredentialsDetails(Collections.emptyMap());
final DataLakeServiceClient storageClient = clientFactory.getStorageClient(credentialsDetails);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
final ListPathsOptions options = new ListPathsOptions();

View File

@ -321,7 +321,7 @@ public final class AzureStorageUtils {
*
* Creates the {@link ProxyOptions proxy options} that {@link HttpClient} will use.
*
* @param propertyContext is sed to supply Proxy configurations
* @param propertyContext to supply Proxy configurations
* @return {@link ProxyOptions proxy options}, null if Proxy is not set
*/
public static ProxyOptions getProxyOptions(final PropertyContext propertyContext) {
@ -342,7 +342,7 @@ public final class AzureStorageUtils {
return proxyOptions;
}
return null;
return null;
}
private static ProxyOptions.Type getProxyType(ProxyConfiguration proxyConfiguration) {

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.http.ProxyOptions;
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
import reactor.core.publisher.Mono;
public class DataLakeServiceClientFactory {
private static final long STORAGE_CLIENT_CACHE_SIZE = 10;
private final ComponentLog logger;
private final ProxyOptions proxyOptions;
private final Cache<ADLSCredentialsDetails, DataLakeServiceClient> clientCache;
public DataLakeServiceClientFactory(ComponentLog logger, ProxyOptions proxyOptions) {
this.logger = logger;
this.proxyOptions = proxyOptions;
this.clientCache = createCache();
}
private Cache<ADLSCredentialsDetails, DataLakeServiceClient> createCache() {
// Beware! By default, Caffeine does not perform cleanup and evict values
// "automatically" or instantly after a value expires. Because of that it
// can happen that there are more elements in the cache than the maximum size.
// See: https://github.com/ben-manes/caffeine/wiki/Cleanup
return Caffeine.newBuilder()
.maximumSize(STORAGE_CLIENT_CACHE_SIZE)
.build();
}
/**
* Retrieves a {@link DataLakeServiceClient}
*
* @param credentialsDetails used for caching because it can contain properties that are results of an expression
* @return DataLakeServiceClient
*/
public DataLakeServiceClient getStorageClient(ADLSCredentialsDetails credentialsDetails) {
return clientCache.get(credentialsDetails, __ -> {
logger.debug("DataLakeServiceClient is not found in the cache with the given credentials. Creating it.");
return createStorageClient(credentialsDetails, proxyOptions);
});
}
private static DataLakeServiceClient createStorageClient(ADLSCredentialsDetails credentialsDetails, ProxyOptions proxyOptions) {
final String accountName = credentialsDetails.getAccountName();
final String accountKey = credentialsDetails.getAccountKey();
final String sasToken = credentialsDetails.getSasToken();
final AccessToken accessToken = credentialsDetails.getAccessToken();
final String endpointSuffix = credentialsDetails.getEndpointSuffix();
final boolean useManagedIdentity = credentialsDetails.getUseManagedIdentity();
final String managedIdentityClientId = credentialsDetails.getManagedIdentityClientId();
final String servicePrincipalTenantId = credentialsDetails.getServicePrincipalTenantId();
final String servicePrincipalClientId = credentialsDetails.getServicePrincipalClientId();
final String servicePrincipalClientSecret = credentialsDetails.getServicePrincipalClientSecret();
final String endpoint = String.format("https://%s.%s", accountName, endpointSuffix);
final DataLakeServiceClientBuilder dataLakeServiceClientBuilder = new DataLakeServiceClientBuilder();
dataLakeServiceClientBuilder.endpoint(endpoint);
if (StringUtils.isNotBlank(accountKey)) {
final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
dataLakeServiceClientBuilder.credential(credential);
} else if (StringUtils.isNotBlank(sasToken)) {
dataLakeServiceClientBuilder.sasToken(sasToken);
} else if (accessToken != null) {
final TokenCredential credential = tokenRequestContext -> Mono.just(accessToken);
dataLakeServiceClientBuilder.credential(credential);
} else if (useManagedIdentity) {
final ManagedIdentityCredential misCredential = new ManagedIdentityCredentialBuilder()
.clientId(managedIdentityClientId)
.build();
dataLakeServiceClientBuilder.credential(misCredential);
} else if (StringUtils.isNoneBlank(servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret)) {
final ClientSecretCredential credential = new ClientSecretCredentialBuilder()
.tenantId(servicePrincipalTenantId)
.clientId(servicePrincipalClientId)
.clientSecret(servicePrincipalClientSecret)
.build();
dataLakeServiceClientBuilder.credential(credential);
} else {
throw new IllegalArgumentException("No valid credentials were provided");
}
final NettyAsyncHttpClientBuilder nettyClientBuilder = new NettyAsyncHttpClientBuilder();
nettyClientBuilder.proxy(proxyOptions);
final HttpClient nettyClient = nettyClientBuilder.build();
dataLakeServiceClientBuilder.httpClient(nettyClient);
return dataLakeServiceClientBuilder.buildClient();
}
}

View File

@ -98,7 +98,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
public void testListRootRecursive() throws Exception {
public void testListRootRecursive() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runProcessor();
@ -131,7 +131,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
public void testListRootNonRecursive() throws Exception {
public void testListRootNonRecursive() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
@ -152,7 +152,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
public void testListSubdirectoryRecursive() throws Exception {
public void testListSubdirectoryRecursive() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
runProcessor();
@ -173,7 +173,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
public void testListSubdirectoryNonRecursive() throws Exception {
public void testListSubdirectoryNonRecursive() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false");
@ -194,7 +194,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
public void testListWithFileFilter() throws Exception {
public void testListWithFileFilter() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$");
@ -218,7 +218,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
public void testListWithFileFilterWithEL() throws Exception {
public void testListWithFileFilterWithEL() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file${suffix}$");
runner.setVariable("suffix", "1.*");
@ -244,7 +244,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
public void testListRootWithPathFilter() throws Exception {
public void testListRootWithPathFilter() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$");
@ -267,7 +267,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
public void testListRootWithPathFilterWithEL() throws Exception {
public void testListRootWithPathFilterWithEL() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "${prefix}${suffix}");
runner.setVariable("prefix", "^dir");
@ -294,7 +294,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
public void testListSubdirectoryWithPathFilter() throws Exception {
public void testListSubdirectoryWithPathFilter() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
@ -315,7 +315,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
public void testListRootWithFileAndPathFilter() throws Exception {
public void testListRootWithFileAndPathFilter() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11");
runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*");
@ -339,7 +339,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
public void testListEmptyDirectory() throws Exception {
public void testListEmptyDirectory() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir3");
runProcessor();
@ -401,7 +401,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
public void testListWithMinAge() throws Exception {
public void testListWithMinAge() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MIN_AGE, "1 hour");
@ -422,7 +422,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
public void testListWithMaxAge() throws Exception {
public void testListWithMaxAge() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MAX_AGE, "1 hour");
@ -447,7 +447,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
public void testListWithMinSize() throws Exception {
public void testListWithMinSize() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MIN_SIZE, "5 B");
@ -471,7 +471,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
public void testListWithMaxSize() throws Exception {
public void testListWithMaxSize() {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
runner.setProperty(ListAzureDataLakeStorage.MAX_SIZE, "5 B");
@ -496,7 +496,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
runner.run();
}
private void assertSuccess(String... testFilePaths) throws Exception {
private void assertSuccess(String... testFilePaths) {
runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, testFilePaths.length);
Map<String, TestFile> expectedFiles = new HashMap<>(testFiles);

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
@ExtendWith(MockitoExtension.class)
class DataLakeServiceClientFactoryTest {
@Mock
private ComponentLog logger;
@Test
void testThatServiceClientIsCachedByCredentials() {
final DataLakeServiceClientFactory clientFactory = new DataLakeServiceClientFactory(logger, null);
final ADLSCredentialsDetails credentials = createCredentialDetails("account");
final DataLakeServiceClient clientOne = clientFactory.getStorageClient(credentials);
final DataLakeServiceClient clientTwo = clientFactory.getStorageClient(credentials);
assertSame(clientOne, clientTwo);
}
@Test
void testThatDifferentServiceClientIsReturnedForDifferentCredentials() {
final DataLakeServiceClientFactory clientFactory = new DataLakeServiceClientFactory(logger, null);
final ADLSCredentialsDetails credentialsOne = createCredentialDetails("accountOne");
final ADLSCredentialsDetails credentialsTwo = createCredentialDetails("accountTwo");
final DataLakeServiceClient clientOne = clientFactory.getStorageClient(credentialsOne);
final DataLakeServiceClient clientTwo = clientFactory.getStorageClient(credentialsTwo);
assertNotSame(clientOne, clientTwo);
}
@Test
void testThatCachedClientIsReturnedAfterDifferentClientIsCreated() {
final DataLakeServiceClientFactory clientFactory = new DataLakeServiceClientFactory(logger, null);
final ADLSCredentialsDetails credentialsOne = createCredentialDetails("accountOne");
final ADLSCredentialsDetails credentialsTwo = createCredentialDetails("accountTwo");
final ADLSCredentialsDetails credentialsThree = createCredentialDetails("accountOne");
final DataLakeServiceClient clientOne = clientFactory.getStorageClient(credentialsOne);
final DataLakeServiceClient clientTwo = clientFactory.getStorageClient(credentialsTwo);
final DataLakeServiceClient clientThree = clientFactory.getStorageClient(credentialsThree);
assertNotSame(clientOne, clientTwo);
assertSame(clientOne, clientThree);
}
private ADLSCredentialsDetails createCredentialDetails(String accountName) {
return ADLSCredentialsDetails.Builder.newBuilder()
.setAccountName(accountName)
.setAccountKey("accountKey")
.setEndpointSuffix("dfs.core.windows.net")
.build();
}
}

View File

@ -18,6 +18,8 @@ package org.apache.nifi.services.azure.storage;
import com.azure.core.credential.AccessToken;
import java.util.Objects;
public class ADLSCredentialsDetails {
private final String accountName;
@ -98,6 +100,45 @@ public class ADLSCredentialsDetails {
return servicePrincipalClientSecret;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ADLSCredentialsDetails that = (ADLSCredentialsDetails) o;
return useManagedIdentity == that.useManagedIdentity
&& Objects.equals(accountName, that.accountName)
&& Objects.equals(accountKey, that.accountKey)
&& Objects.equals(sasToken, that.sasToken)
&& Objects.equals(endpointSuffix, that.endpointSuffix)
&& Objects.equals(accessToken, that.accessToken)
&& Objects.equals(managedIdentityClientId, that.managedIdentityClientId)
&& Objects.equals(servicePrincipalTenantId, that.servicePrincipalTenantId)
&& Objects.equals(servicePrincipalClientId, that.servicePrincipalClientId)
&& Objects.equals(servicePrincipalClientSecret, that.servicePrincipalClientSecret);
}
@Override
public int hashCode() {
return Objects.hash(
accountName,
accountKey,
sasToken,
endpointSuffix,
accessToken,
useManagedIdentity,
managedIdentityClientId,
servicePrincipalTenantId,
servicePrincipalClientId,
servicePrincipalClientSecret
);
}
public static class Builder {
private String accountName;
private String accountKey;