From 320aed024f62ec87ccdd86e6f71e10d75eebd344 Mon Sep 17 00:00:00 2001 From: Nandor Soma Abonyi Date: Thu, 23 Jun 2022 01:25:45 +0200 Subject: [PATCH] NIFI-10152 Storage client caching in Azure ADLS processors This closes #6158. Signed-off-by: Peter Turcsanyi --- .../src/main/resources/META-INF/NOTICE | 5 + .../nifi-azure-processors/pom.xml | 7 +- ...AbstractAzureDataLakeStorageProcessor.java | 80 +++-------- .../storage/ListAzureDataLakeStorage.java | 14 +- .../storage/utils/AzureStorageUtils.java | 4 +- .../utils/DataLakeServiceClientFactory.java | 125 ++++++++++++++++++ .../storage/ITListAzureDataLakeStorage.java | 32 ++--- .../DataLakeServiceClientFactoryTest.java | 84 ++++++++++++ .../azure/storage/ADLSCredentialsDetails.java | 41 ++++++ 9 files changed, 310 insertions(+), 82 deletions(-) create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactoryTest.java diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE index 5faf77b19f..a4fb770588 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE @@ -195,6 +195,11 @@ The following binary components are provided under the Apache Software License v Reactive Streams Netty Driver Copyright 2020, Project Reactor + (ASLv2) Caffeine (com.github.ben-manes.caffeine:caffeine:jar:2.9.2 - https://github.com/ben-manes/caffeine) + The following NOTICE information applies: + Caffeine (caching library) + Copyright Ben Manes + ************************ Common Development and Distribution License 1.0 ************************ diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 667a7923ed..a6ad604524 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -135,6 +135,12 @@ commons-io commons-io + + com.github.ben-manes.caffeine + caffeine + 2.9.2 + + org.apache.nifi nifi-mock @@ -159,7 +165,6 @@ 1.18.0-SNAPSHOT test - org.apache.nifi nifi-schema-registry-service-api diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java index 1a66dda500..41e677d0a8 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java @@ -16,18 +16,10 @@ */ package org.apache.nifi.processors.azure; -import com.azure.core.credential.AccessToken; -import com.azure.core.credential.TokenCredential; -import com.azure.core.http.HttpClient; -import com.azure.core.http.netty.NettyAsyncHttpClientBuilder; -import com.azure.identity.ClientSecretCredential; -import com.azure.identity.ClientSecretCredentialBuilder; -import com.azure.identity.ManagedIdentityCredential; -import com.azure.identity.ManagedIdentityCredentialBuilder; -import com.azure.storage.common.StorageSharedKeyCredential; import com.azure.storage.file.datalake.DataLakeServiceClient; -import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -40,9 +32,10 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.processors.azure.storage.utils.DataLakeServiceClientFactory; import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails; import org.apache.nifi.services.azure.storage.ADLSCredentialsService; -import reactor.core.publisher.Mono; import java.util.Arrays; import java.util.Collections; @@ -51,7 +44,6 @@ import java.util.Map; import java.util.Set; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME; -import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions; public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor { @@ -65,7 +57,7 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder() .name("filesystem-name").displayName("Filesystem Name") - .description("Name of the Azure Storage File System. It is assumed to be already existing.") + .description("Name of the Azure Storage File System (also called Container). It is assumed to be already existing.") .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(true) @@ -103,65 +95,31 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc public static final String TEMP_FILE_DIRECTORY = "_nifitempdirectory"; + private DataLakeServiceClientFactory clientFactory; + @Override public Set getRelationships() { return RELATIONSHIPS; } - public static DataLakeServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) { + @OnScheduled + public void onScheduled(final ProcessContext context) { + clientFactory = new DataLakeServiceClientFactory(getLogger(), AzureStorageUtils.getProxyOptions(context)); + } + + @OnStopped + public void onStopped() { + clientFactory = null; + } + + public DataLakeServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) { final Map attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap(); final ADLSCredentialsService credentialsService = context.getProperty(ADLS_CREDENTIALS_SERVICE).asControllerService(ADLSCredentialsService.class); final ADLSCredentialsDetails credentialsDetails = credentialsService.getCredentialsDetails(attributes); - final String accountName = credentialsDetails.getAccountName(); - final String accountKey = credentialsDetails.getAccountKey(); - final String sasToken = credentialsDetails.getSasToken(); - final AccessToken accessToken = credentialsDetails.getAccessToken(); - final String endpointSuffix = credentialsDetails.getEndpointSuffix(); - final boolean useManagedIdentity = credentialsDetails.getUseManagedIdentity(); - final String managedIdentityClientId = credentialsDetails.getManagedIdentityClientId(); - final String servicePrincipalTenantId = credentialsDetails.getServicePrincipalTenantId(); - final String servicePrincipalClientId = credentialsDetails.getServicePrincipalClientId(); - final String servicePrincipalClientSecret = credentialsDetails.getServicePrincipalClientSecret(); - - final String endpoint = String.format("https://%s.%s", accountName, endpointSuffix); - - final DataLakeServiceClientBuilder dataLakeServiceClientBuilder = new DataLakeServiceClientBuilder(); - dataLakeServiceClientBuilder.endpoint(endpoint); - - if (StringUtils.isNotBlank(accountKey)) { - final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey); - dataLakeServiceClientBuilder.credential(credential); - } else if (StringUtils.isNotBlank(sasToken)) { - dataLakeServiceClientBuilder.sasToken(sasToken); - } else if (accessToken != null) { - final TokenCredential credential = tokenRequestContext -> Mono.just(accessToken); - dataLakeServiceClientBuilder.credential(credential); - } else if (useManagedIdentity) { - final ManagedIdentityCredential misCredential = new ManagedIdentityCredentialBuilder() - .clientId(managedIdentityClientId) - .build(); - dataLakeServiceClientBuilder.credential(misCredential); - } else if (StringUtils.isNoneBlank(servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret)) { - final ClientSecretCredential credential = new ClientSecretCredentialBuilder() - .tenantId(servicePrincipalTenantId) - .clientId(servicePrincipalClientId) - .clientSecret(servicePrincipalClientSecret) - .build(); - dataLakeServiceClientBuilder.credential(credential); - } else { - throw new IllegalArgumentException("No valid credentials were provided"); - } - - final NettyAsyncHttpClientBuilder nettyClientBuilder = new NettyAsyncHttpClientBuilder(); - nettyClientBuilder.proxy(getProxyOptions(context)); - - final HttpClient nettyClient = nettyClientBuilder.build(); - dataLakeServiceClientBuilder.httpClient(nettyClient); - - final DataLakeServiceClient storageClient = dataLakeServiceClientBuilder.buildClient(); + final DataLakeServiceClient storageClient = clientFactory.getStorageClient(credentialsDetails); return storageClient; } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java index 24f51b5ae2..601a7e285c 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java @@ -43,7 +43,10 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.storage.utils.ADLSFileInfo; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.processors.azure.storage.utils.DataLakeServiceClientFactory; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails; +import org.apache.nifi.services.azure.storage.ADLSCredentialsService; import java.io.IOException; import java.util.Arrays; @@ -66,7 +69,6 @@ import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProce import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateDirectoryProperty; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateFileSystemProperty; -import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.getStorageClient; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_ETAG; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME; @@ -170,6 +172,8 @@ public class ListAzureDataLakeStorage extends AbstractListAzureProcessor getSupportedPropertyDescriptors() { return PROPERTIES; @@ -179,12 +183,14 @@ public class ListAzureDataLakeStorage extends AbstractListAzureProcessor clientCache; + + public DataLakeServiceClientFactory(ComponentLog logger, ProxyOptions proxyOptions) { + this.logger = logger; + this.proxyOptions = proxyOptions; + this.clientCache = createCache(); + } + + private Cache createCache() { + // Beware! By default, Caffeine does not perform cleanup and evict values + // "automatically" or instantly after a value expires. Because of that it + // can happen that there are more elements in the cache than the maximum size. + // See: https://github.com/ben-manes/caffeine/wiki/Cleanup + return Caffeine.newBuilder() + .maximumSize(STORAGE_CLIENT_CACHE_SIZE) + .build(); + } + + /** + * Retrieves a {@link DataLakeServiceClient} + * + * @param credentialsDetails used for caching because it can contain properties that are results of an expression + * @return DataLakeServiceClient + */ + public DataLakeServiceClient getStorageClient(ADLSCredentialsDetails credentialsDetails) { + return clientCache.get(credentialsDetails, __ -> { + logger.debug("DataLakeServiceClient is not found in the cache with the given credentials. Creating it."); + return createStorageClient(credentialsDetails, proxyOptions); + }); + } + + private static DataLakeServiceClient createStorageClient(ADLSCredentialsDetails credentialsDetails, ProxyOptions proxyOptions) { + final String accountName = credentialsDetails.getAccountName(); + final String accountKey = credentialsDetails.getAccountKey(); + final String sasToken = credentialsDetails.getSasToken(); + final AccessToken accessToken = credentialsDetails.getAccessToken(); + final String endpointSuffix = credentialsDetails.getEndpointSuffix(); + final boolean useManagedIdentity = credentialsDetails.getUseManagedIdentity(); + final String managedIdentityClientId = credentialsDetails.getManagedIdentityClientId(); + final String servicePrincipalTenantId = credentialsDetails.getServicePrincipalTenantId(); + final String servicePrincipalClientId = credentialsDetails.getServicePrincipalClientId(); + final String servicePrincipalClientSecret = credentialsDetails.getServicePrincipalClientSecret(); + + final String endpoint = String.format("https://%s.%s", accountName, endpointSuffix); + + final DataLakeServiceClientBuilder dataLakeServiceClientBuilder = new DataLakeServiceClientBuilder(); + dataLakeServiceClientBuilder.endpoint(endpoint); + + if (StringUtils.isNotBlank(accountKey)) { + final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey); + dataLakeServiceClientBuilder.credential(credential); + } else if (StringUtils.isNotBlank(sasToken)) { + dataLakeServiceClientBuilder.sasToken(sasToken); + } else if (accessToken != null) { + final TokenCredential credential = tokenRequestContext -> Mono.just(accessToken); + dataLakeServiceClientBuilder.credential(credential); + } else if (useManagedIdentity) { + final ManagedIdentityCredential misCredential = new ManagedIdentityCredentialBuilder() + .clientId(managedIdentityClientId) + .build(); + dataLakeServiceClientBuilder.credential(misCredential); + } else if (StringUtils.isNoneBlank(servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret)) { + final ClientSecretCredential credential = new ClientSecretCredentialBuilder() + .tenantId(servicePrincipalTenantId) + .clientId(servicePrincipalClientId) + .clientSecret(servicePrincipalClientSecret) + .build(); + dataLakeServiceClientBuilder.credential(credential); + } else { + throw new IllegalArgumentException("No valid credentials were provided"); + } + + final NettyAsyncHttpClientBuilder nettyClientBuilder = new NettyAsyncHttpClientBuilder(); + nettyClientBuilder.proxy(proxyOptions); + + final HttpClient nettyClient = nettyClientBuilder.build(); + dataLakeServiceClientBuilder.httpClient(nettyClient); + + return dataLakeServiceClientBuilder.buildClient(); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java index 978fe9433f..26ee97ee65 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java @@ -98,7 +98,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { } @Test - public void testListRootRecursive() throws Exception { + public void testListRootRecursive() { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); runProcessor(); @@ -131,7 +131,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { } @Test - public void testListRootNonRecursive() throws Exception { + public void testListRootNonRecursive() { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false"); @@ -152,7 +152,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { } @Test - public void testListSubdirectoryRecursive() throws Exception { + public void testListSubdirectoryRecursive() { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); runProcessor(); @@ -173,7 +173,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { } @Test - public void testListSubdirectoryNonRecursive() throws Exception { + public void testListSubdirectoryNonRecursive() { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false"); @@ -194,7 +194,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { } @Test - public void testListWithFileFilter() throws Exception { + public void testListWithFileFilter() { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$"); @@ -218,7 +218,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { } @Test - public void testListWithFileFilterWithEL() throws Exception { + public void testListWithFileFilterWithEL() { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file${suffix}$"); runner.setVariable("suffix", "1.*"); @@ -244,7 +244,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { } @Test - public void testListRootWithPathFilter() throws Exception { + public void testListRootWithPathFilter() { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$"); @@ -267,7 +267,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { } @Test - public void testListRootWithPathFilterWithEL() throws Exception { + public void testListRootWithPathFilterWithEL() { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "${prefix}${suffix}"); runner.setVariable("prefix", "^dir"); @@ -294,7 +294,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { } @Test - public void testListSubdirectoryWithPathFilter() throws Exception { + public void testListSubdirectoryWithPathFilter() { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*"); @@ -315,7 +315,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { } @Test - public void testListRootWithFileAndPathFilter() throws Exception { + public void testListRootWithFileAndPathFilter() { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11"); runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*"); @@ -339,7 +339,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { } @Test - public void testListEmptyDirectory() throws Exception { + public void testListEmptyDirectory() { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir3"); runProcessor(); @@ -401,7 +401,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { } @Test - public void testListWithMinAge() throws Exception { + public void testListWithMinAge() { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.MIN_AGE, "1 hour"); @@ -422,7 +422,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { } @Test - public void testListWithMaxAge() throws Exception { + public void testListWithMaxAge() { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.MAX_AGE, "1 hour"); @@ -447,7 +447,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { } @Test - public void testListWithMinSize() throws Exception { + public void testListWithMinSize() { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.MIN_SIZE, "5 B"); @@ -471,7 +471,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { } @Test - public void testListWithMaxSize() throws Exception { + public void testListWithMaxSize() { runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.MAX_SIZE, "5 B"); @@ -496,7 +496,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { runner.run(); } - private void assertSuccess(String... testFilePaths) throws Exception { + private void assertSuccess(String... testFilePaths) { runner.assertTransferCount(ListAzureDataLakeStorage.REL_SUCCESS, testFilePaths.length); Map expectedFiles = new HashMap<>(testFiles); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactoryTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactoryTest.java new file mode 100644 index 0000000000..8576553d28 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactoryTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.utils; + +import com.azure.storage.file.datalake.DataLakeServiceClient; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; + +@ExtendWith(MockitoExtension.class) +class DataLakeServiceClientFactoryTest { + + @Mock + private ComponentLog logger; + + @Test + void testThatServiceClientIsCachedByCredentials() { + final DataLakeServiceClientFactory clientFactory = new DataLakeServiceClientFactory(logger, null); + + final ADLSCredentialsDetails credentials = createCredentialDetails("account"); + + final DataLakeServiceClient clientOne = clientFactory.getStorageClient(credentials); + final DataLakeServiceClient clientTwo = clientFactory.getStorageClient(credentials); + + assertSame(clientOne, clientTwo); + } + + @Test + void testThatDifferentServiceClientIsReturnedForDifferentCredentials() { + final DataLakeServiceClientFactory clientFactory = new DataLakeServiceClientFactory(logger, null); + + final ADLSCredentialsDetails credentialsOne = createCredentialDetails("accountOne"); + final ADLSCredentialsDetails credentialsTwo = createCredentialDetails("accountTwo"); + + final DataLakeServiceClient clientOne = clientFactory.getStorageClient(credentialsOne); + final DataLakeServiceClient clientTwo = clientFactory.getStorageClient(credentialsTwo); + + assertNotSame(clientOne, clientTwo); + } + + @Test + void testThatCachedClientIsReturnedAfterDifferentClientIsCreated() { + final DataLakeServiceClientFactory clientFactory = new DataLakeServiceClientFactory(logger, null); + + final ADLSCredentialsDetails credentialsOne = createCredentialDetails("accountOne"); + final ADLSCredentialsDetails credentialsTwo = createCredentialDetails("accountTwo"); + final ADLSCredentialsDetails credentialsThree = createCredentialDetails("accountOne"); + + final DataLakeServiceClient clientOne = clientFactory.getStorageClient(credentialsOne); + final DataLakeServiceClient clientTwo = clientFactory.getStorageClient(credentialsTwo); + final DataLakeServiceClient clientThree = clientFactory.getStorageClient(credentialsThree); + + assertNotSame(clientOne, clientTwo); + assertSame(clientOne, clientThree); + } + + private ADLSCredentialsDetails createCredentialDetails(String accountName) { + return ADLSCredentialsDetails.Builder.newBuilder() + .setAccountName(accountName) + .setAccountKey("accountKey") + .setEndpointSuffix("dfs.core.windows.net") + .build(); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java index eb3b1237c0..0a831161e5 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java @@ -18,6 +18,8 @@ package org.apache.nifi.services.azure.storage; import com.azure.core.credential.AccessToken; +import java.util.Objects; + public class ADLSCredentialsDetails { private final String accountName; @@ -98,6 +100,45 @@ public class ADLSCredentialsDetails { return servicePrincipalClientSecret; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + ADLSCredentialsDetails that = (ADLSCredentialsDetails) o; + return useManagedIdentity == that.useManagedIdentity + && Objects.equals(accountName, that.accountName) + && Objects.equals(accountKey, that.accountKey) + && Objects.equals(sasToken, that.sasToken) + && Objects.equals(endpointSuffix, that.endpointSuffix) + && Objects.equals(accessToken, that.accessToken) + && Objects.equals(managedIdentityClientId, that.managedIdentityClientId) + && Objects.equals(servicePrincipalTenantId, that.servicePrincipalTenantId) + && Objects.equals(servicePrincipalClientId, that.servicePrincipalClientId) + && Objects.equals(servicePrincipalClientSecret, that.servicePrincipalClientSecret); + } + + @Override + public int hashCode() { + return Objects.hash( + accountName, + accountKey, + sasToken, + endpointSuffix, + accessToken, + useManagedIdentity, + managedIdentityClientId, + servicePrincipalTenantId, + servicePrincipalClientId, + servicePrincipalClientSecret + ); + } + public static class Builder { private String accountName; private String accountKey;