NIFI-11586 Added AzureStorageCredentialsControllerServiceLookup_v12

Also added client caching in blob v_12 processors which was needed to support multiple credentials provided by the new lookup service

This closes #7300

Signed-off-by: Nandor Soma Abonyi <nsabonyi@apache.org>
This commit is contained in:
Peter Turcsanyi 2023-05-23 22:50:47 +02:00 committed by Nandor Soma Abonyi
parent 51a941ae0a
commit 963518d943
No known key found for this signature in database
GPG Key ID: AFFFD8C3A1A88ED7
18 changed files with 513 additions and 118 deletions

View File

@ -16,29 +16,22 @@
*/
package org.apache.nifi.processors.azure;
import com.azure.core.credential.AzureSasCredential;
import com.azure.core.credential.TokenCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.common.StorageSharedKeyCredential;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.BlobServiceClientFactory;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Collections;
@ -92,7 +85,7 @@ public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor {
REL_FAILURE
)));
private BlobServiceClient storageClient;
private volatile BlobServiceClientFactory clientFactory;
@Override
public Set<Relationship> getRelationships() {
@ -101,64 +94,23 @@ public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor {
@OnScheduled
public void onScheduled(ProcessContext context) {
storageClient = createStorageClient(context);
clientFactory = new BlobServiceClientFactory(getLogger(), getProxyOptions(context));
}
@OnStopped
public void onStopped() {
storageClient = null;
clientFactory = null;
}
protected BlobServiceClient getStorageClient() {
return storageClient;
}
protected BlobServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) {
final Map<String, String> attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap();
public static BlobServiceClient createStorageClient(PropertyContext context) {
final AzureStorageCredentialsService_v12 credentialsService = context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class);
final AzureStorageCredentialsDetails_v12 credentialsDetails = credentialsService.getCredentialsDetails();
final AzureStorageCredentialsDetails_v12 credentialsDetails = credentialsService.getCredentialsDetails(attributes);
final BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder();
clientBuilder.endpoint(String.format("https://%s.%s", credentialsDetails.getAccountName(), credentialsDetails.getEndpointSuffix()));
final BlobServiceClient storageClient = clientFactory.getStorageClient(credentialsDetails);
final NettyAsyncHttpClientBuilder nettyClientBuilder = new NettyAsyncHttpClientBuilder();
nettyClientBuilder.proxy(getProxyOptions(context));
final HttpClient nettyClient = nettyClientBuilder.build();
clientBuilder.httpClient(nettyClient);
configureCredential(clientBuilder, credentialsService, credentialsDetails);
return clientBuilder.buildClient();
}
private static void configureCredential(BlobServiceClientBuilder clientBuilder, AzureStorageCredentialsService_v12 credentialsService, AzureStorageCredentialsDetails_v12 credentialsDetails) {
switch (credentialsDetails.getCredentialsType()) {
case ACCOUNT_KEY:
clientBuilder.credential(new StorageSharedKeyCredential(credentialsDetails.getAccountName(), credentialsDetails.getAccountKey()));
break;
case SAS_TOKEN:
clientBuilder.credential(new AzureSasCredential(credentialsDetails.getSasToken()));
break;
case MANAGED_IDENTITY:
clientBuilder.credential(new ManagedIdentityCredentialBuilder()
.clientId(credentialsDetails.getManagedIdentityClientId())
.build());
break;
case SERVICE_PRINCIPAL:
clientBuilder.credential(new ClientSecretCredentialBuilder()
.tenantId(credentialsDetails.getServicePrincipalTenantId())
.clientId(credentialsDetails.getServicePrincipalClientId())
.clientSecret(credentialsDetails.getServicePrincipalClientSecret())
.build());
break;
case ACCESS_TOKEN:
TokenCredential credential = tokenRequestContext -> Mono.just(credentialsService.getCredentialsDetails().getAccessToken());
clientBuilder.credential(credential);
break;
default:
throw new IllegalArgumentException("Unhandled credentials type: " + credentialsDetails.getCredentialsType());
}
return storageClient;
}
protected Map<String, String> createBlobAttributesMap(BlobClient blobClient) {

View File

@ -95,7 +95,7 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
public static final String TEMP_FILE_DIRECTORY = "_nifitempdirectory";
private DataLakeServiceClientFactory clientFactory;
private volatile DataLakeServiceClientFactory clientFactory;
@Override
public Set<Relationship> getRelationships() {

View File

@ -101,7 +101,7 @@ public class DeleteAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
long startNanos = System.nanoTime();
try {
BlobServiceClient storageClient = getStorageClient();
BlobServiceClient storageClient = getStorageClient(context, flowFile);
BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName);
BlobClient blobClient = containerClient.getBlobClient(blobName);

View File

@ -152,7 +152,7 @@ public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 im
Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null);
try {
BlobServiceClient storageClient = getStorageClient();
BlobServiceClient storageClient = getStorageClient(context, flowFile);
BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName);
final BlobClient blobClient;
if (isClientSideEncryptionEnabled(context)) {

View File

@ -46,8 +46,11 @@ import org.apache.nifi.processor.util.list.ListedEntityTracker;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
import org.apache.nifi.processors.azure.storage.utils.BlobServiceClientFactory;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12;
import java.io.IOException;
import java.util.ArrayList;
@ -59,7 +62,7 @@ import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12.STORAGE_CREDENTIALS_SERVICE;
import static org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12.createStorageClient;
import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
@ -148,7 +151,7 @@ public class ListAzureBlobStorage_v12 extends AbstractListAzureProcessor<BlobInf
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
private BlobServiceClient storageClient;
private volatile BlobServiceClientFactory clientFactory;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -157,12 +160,12 @@ public class ListAzureBlobStorage_v12 extends AbstractListAzureProcessor<BlobInf
@OnScheduled
public void onScheduled(ProcessContext context) {
storageClient = createStorageClient(context);
clientFactory = new BlobServiceClientFactory(getLogger(), getProxyOptions(context));
}
@OnStopped
public void onStopped() {
storageClient = null;
clientFactory = null;
}
@Override
@ -215,6 +218,10 @@ public class ListAzureBlobStorage_v12 extends AbstractListAzureProcessor<BlobInf
try {
final List<BlobInfo> listing = new ArrayList<>();
final AzureStorageCredentialsService_v12 credentialsService = context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class);
final AzureStorageCredentialsDetails_v12 credentialsDetails = credentialsService.getCredentialsDetails(Collections.emptyMap());
final BlobServiceClient storageClient = clientFactory.getStorageClient(credentialsDetails);
final BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName);
final ListBlobsOptions options = new ListBlobsOptions()

View File

@ -175,7 +175,7 @@ public class ListAzureDataLakeStorage extends AbstractListAzureProcessor<ADLSFil
private volatile Pattern filePattern;
private volatile Pattern pathPattern;
private DataLakeServiceClientFactory clientFactory;
private volatile DataLakeServiceClientFactory clientFactory;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

View File

@ -160,7 +160,7 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 impl
long startNanos = System.nanoTime();
try {
BlobServiceClient storageClient = getStorageClient();
BlobServiceClient storageClient = getStorageClient(context, flowFile);
BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName);
if (createContainer && !containerClient.exists()) {
containerClient.create();

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
import com.azure.core.http.ProxyOptions;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.nifi.logging.ComponentLog;
abstract class AbstractStorageClientFactory<CREDENTIAL, CLIENT> {
private static final long STORAGE_CLIENT_CACHE_SIZE = 10;
private final ComponentLog logger;
private final ProxyOptions proxyOptions;
private final Cache<CREDENTIAL, CLIENT> clientCache;
protected AbstractStorageClientFactory(final ComponentLog logger, final ProxyOptions proxyOptions) {
this.logger = logger;
this.proxyOptions = proxyOptions;
this.clientCache = createCache();
}
private Cache<CREDENTIAL, CLIENT> createCache() {
// Beware! By default, Caffeine does not perform cleanup and evict values
// "automatically" or instantly after a value expires. Because of that it
// can happen that there are more elements in the cache than the maximum size.
// See: https://github.com/ben-manes/caffeine/wiki/Cleanup
return Caffeine.newBuilder()
.maximumSize(STORAGE_CLIENT_CACHE_SIZE)
.build();
}
/**
* Retrieves storage client object
*
* @param credentialsDetails used for caching because it can contain properties that are results of an expression
* @return CLIENT
*/
public CLIENT getStorageClient(final CREDENTIAL credentialsDetails) {
return clientCache.get(credentialsDetails, __ -> {
logger.debug(credentialsDetails.getClass().getSimpleName() + " is not found in the cache with the given credentials. Creating it.");
return createStorageClient(credentialsDetails, proxyOptions);
});
}
protected abstract CLIENT createStorageClient(CREDENTIAL credentialsDetails, ProxyOptions proxyOptions);
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
import com.azure.core.credential.AzureSasCredential;
import com.azure.core.credential.TokenCredential;
import com.azure.core.http.ProxyOptions;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.HttpClientOptions;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
import reactor.core.publisher.Mono;
public class BlobServiceClientFactory extends AbstractStorageClientFactory<AzureStorageCredentialsDetails_v12, BlobServiceClient> {
public BlobServiceClientFactory(final ComponentLog logger, final ProxyOptions proxyOptions) {
super(logger, proxyOptions);
}
protected BlobServiceClient createStorageClient(final AzureStorageCredentialsDetails_v12 credentialsDetails, final ProxyOptions proxyOptions) {
final BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder();
clientBuilder.endpoint(String.format("https://%s.%s", credentialsDetails.getAccountName(), credentialsDetails.getEndpointSuffix()));
final ClientOptions clientOptions = new HttpClientOptions().setProxyOptions(proxyOptions);
clientBuilder.clientOptions(clientOptions);
configureCredential(clientBuilder, credentialsDetails);
return clientBuilder.buildClient();
}
private void configureCredential(final BlobServiceClientBuilder clientBuilder, final AzureStorageCredentialsDetails_v12 credentialsDetails) {
switch (credentialsDetails.getCredentialsType()) {
case ACCOUNT_KEY:
clientBuilder.credential(new StorageSharedKeyCredential(credentialsDetails.getAccountName(), credentialsDetails.getAccountKey()));
break;
case SAS_TOKEN:
clientBuilder.credential(new AzureSasCredential(credentialsDetails.getSasToken()));
break;
case MANAGED_IDENTITY:
clientBuilder.credential(new ManagedIdentityCredentialBuilder()
.clientId(credentialsDetails.getManagedIdentityClientId())
.build());
break;
case SERVICE_PRINCIPAL:
clientBuilder.credential(new ClientSecretCredentialBuilder()
.tenantId(credentialsDetails.getServicePrincipalTenantId())
.clientId(credentialsDetails.getServicePrincipalClientId())
.clientSecret(credentialsDetails.getServicePrincipalClientSecret())
.build());
break;
case ACCESS_TOKEN:
TokenCredential credential = tokenRequestContext -> Mono.just(credentialsDetails.getAccessToken());
clientBuilder.credential(credential);
break;
default:
throw new IllegalArgumentException("Unhandled credentials type: " + credentialsDetails.getCredentialsType());
}
}
}

View File

@ -18,9 +18,9 @@ package org.apache.nifi.processors.azure.storage.utils;
import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.http.ProxyOptions;
import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.HttpClientOptions;
import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.identity.ManagedIdentityCredential;
@ -28,52 +28,18 @@ import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails;
import reactor.core.publisher.Mono;
public class DataLakeServiceClientFactory {
private static final long STORAGE_CLIENT_CACHE_SIZE = 10;
private final ComponentLog logger;
private final ProxyOptions proxyOptions;
private final Cache<ADLSCredentialsDetails, DataLakeServiceClient> clientCache;
public class DataLakeServiceClientFactory extends AbstractStorageClientFactory<ADLSCredentialsDetails, DataLakeServiceClient> {
public DataLakeServiceClientFactory(ComponentLog logger, ProxyOptions proxyOptions) {
this.logger = logger;
this.proxyOptions = proxyOptions;
this.clientCache = createCache();
super(logger, proxyOptions);
}
private Cache<ADLSCredentialsDetails, DataLakeServiceClient> createCache() {
// Beware! By default, Caffeine does not perform cleanup and evict values
// "automatically" or instantly after a value expires. Because of that it
// can happen that there are more elements in the cache than the maximum size.
// See: https://github.com/ben-manes/caffeine/wiki/Cleanup
return Caffeine.newBuilder()
.maximumSize(STORAGE_CLIENT_CACHE_SIZE)
.build();
}
/**
* Retrieves a {@link DataLakeServiceClient}
*
* @param credentialsDetails used for caching because it can contain properties that are results of an expression
* @return DataLakeServiceClient
*/
public DataLakeServiceClient getStorageClient(ADLSCredentialsDetails credentialsDetails) {
return clientCache.get(credentialsDetails, __ -> {
logger.debug("DataLakeServiceClient is not found in the cache with the given credentials. Creating it.");
return createStorageClient(credentialsDetails, proxyOptions);
});
}
private static DataLakeServiceClient createStorageClient(ADLSCredentialsDetails credentialsDetails, ProxyOptions proxyOptions) {
protected DataLakeServiceClient createStorageClient(ADLSCredentialsDetails credentialsDetails, ProxyOptions proxyOptions) {
final String accountName = credentialsDetails.getAccountName();
final String accountKey = credentialsDetails.getAccountKey();
final String sasToken = credentialsDetails.getSasToken();
@ -114,11 +80,8 @@ public class DataLakeServiceClientFactory {
throw new IllegalArgumentException("No valid credentials were provided");
}
final NettyAsyncHttpClientBuilder nettyClientBuilder = new NettyAsyncHttpClientBuilder();
nettyClientBuilder.proxy(proxyOptions);
final HttpClient nettyClient = nettyClientBuilder.build();
dataLakeServiceClientBuilder.httpClient(nettyClient);
final ClientOptions clientOptions = new HttpClientOptions().setProxyOptions(proxyOptions);
dataLakeServiceClientBuilder.clientOptions(clientOptions);
return dataLakeServiceClientBuilder.buildClient();
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.service.lookup.AbstractSingleAttributeBasedControllerServiceLookup;
import java.util.Map;
@Tags({ "azure", "microsoft", "cloud", "storage", "blob", "queue", "credentials" })
@CapabilityDescription("Provides an AzureStorageCredentialsService_v12 that can be used to dynamically select another AzureStorageCredentialsService_v12. " +
"This service requires an attribute named 'azure.storage.credentials.name' to be passed in, and will throw an exception if the attribute is missing. " +
"The value of 'azure.storage.credentials.name' will be used to select the AzureStorageCredentialsService_v12 that has been registered with that name. " +
"This will allow multiple AzureStorageCredentialsServices_v12 to be defined and registered, and then selected dynamically at runtime by tagging flow files " +
"with the appropriate 'azure.storage.credentials.name' attribute.")
@DynamicProperty(name = "The name to register AzureStorageCredentialsService_v12", value = "The AzureStorageCredentialsService_v12",
description = "If '" + AzureStorageCredentialsControllerServiceLookup_v12.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + "' attribute contains " +
"the name of the dynamic property, then the AzureStorageCredentialsService_v12 (registered in the value) will be selected.",
expressionLanguageScope = ExpressionLanguageScope.NONE)
public class AzureStorageCredentialsControllerServiceLookup_v12
extends AbstractSingleAttributeBasedControllerServiceLookup<AzureStorageCredentialsService_v12> implements AzureStorageCredentialsService_v12 {
public static final String AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE = "azure.storage.credentials.name";
@Override
protected String getLookupAttribute() {
return AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE;
}
@Override
public Class<AzureStorageCredentialsService_v12> getServiceType() {
return AzureStorageCredentialsService_v12.class;
}
@Override
public AzureStorageCredentialsDetails_v12 getCredentialsDetails(final Map<String, String> attributes) {
return lookupService(attributes).getCredentialsDetails(attributes);
}
}

View File

@ -29,6 +29,7 @@ import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Provides credentials details for Azure Blob processors
@ -133,7 +134,7 @@ public class AzureStorageCredentialsControllerService_v12 extends AbstractContro
}
@Override
public AzureStorageCredentialsDetails_v12 getCredentialsDetails() {
public AzureStorageCredentialsDetails_v12 getCredentialsDetails(Map<String, String> attributes) {
String accountName = context.getProperty(ACCOUNT_NAME).getValue();
String endpointSuffix = context.getProperty(ENDPOINT_SUFFIX).getValue();
AzureStorageCredentialsType credentialsType = AzureStorageCredentialsType.valueOf(context.getProperty(CREDENTIALS_TYPE).getValue());

View File

@ -20,4 +20,5 @@ org.apache.nifi.services.azure.storage.ADLSCredentialsControllerServiceLookup
org.apache.nifi.services.azure.cosmos.document.AzureCosmosDBClientService
org.apache.nifi.services.azure.storage.AzureStorageEmulatorCredentialsControllerService
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12
org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup_v12
org.apache.nifi.services.azure.StandardAzureCredentialsControllerService

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.utils;
import com.azure.storage.blob.BlobServiceClient;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
@ExtendWith(MockitoExtension.class)
class BlobServiceClientFactoryTest {
@Mock
private ComponentLog logger;
@Test
void testThatServiceClientIsCachedByCredentials() {
final BlobServiceClientFactory clientFactory = new BlobServiceClientFactory(logger, null);
final AzureStorageCredentialsDetails_v12 credentials = createCredentialDetails("account");
final BlobServiceClient clientOne = clientFactory.getStorageClient(credentials);
final BlobServiceClient clientTwo = clientFactory.getStorageClient(credentials);
assertSame(clientOne, clientTwo);
}
@Test
void testThatDifferentServiceClientIsReturnedForDifferentCredentials() {
final BlobServiceClientFactory clientFactory = new BlobServiceClientFactory(logger, null);
final AzureStorageCredentialsDetails_v12 credentialsOne = createCredentialDetails("accountOne");
final AzureStorageCredentialsDetails_v12 credentialsTwo = createCredentialDetails("accountTwo");
final BlobServiceClient clientOne = clientFactory.getStorageClient(credentialsOne);
final BlobServiceClient clientTwo = clientFactory.getStorageClient(credentialsTwo);
assertNotSame(clientOne, clientTwo);
}
@Test
void testThatCachedClientIsReturnedAfterDifferentClientIsCreated() {
final BlobServiceClientFactory clientFactory = new BlobServiceClientFactory(logger, null);
final AzureStorageCredentialsDetails_v12 credentialsOne = createCredentialDetails("accountOne");
final AzureStorageCredentialsDetails_v12 credentialsTwo = createCredentialDetails("accountTwo");
final AzureStorageCredentialsDetails_v12 credentialsThree = createCredentialDetails("accountOne");
final BlobServiceClient clientOne = clientFactory.getStorageClient(credentialsOne);
final BlobServiceClient clientTwo = clientFactory.getStorageClient(credentialsTwo);
final BlobServiceClient clientThree = clientFactory.getStorageClient(credentialsThree);
assertNotSame(clientOne, clientTwo);
assertSame(clientOne, clientThree);
}
private AzureStorageCredentialsDetails_v12 createCredentialDetails(String accountName) {
return AzureStorageCredentialsDetails_v12.createWithAccountKey(accountName, "dfs.core.windows.net", "accountKey");
}
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.azure.storage;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class TestAzureStorageCredentialsControllerServiceLookup_v12 {
private MockAzureStorageCredentialsService serviceA;
private MockAzureStorageCredentialsService serviceB;
private AzureStorageCredentialsControllerServiceLookup_v12 lookupService;
private TestRunner runner;
@BeforeEach
public void setup() throws InitializationException {
serviceA = new MockAzureStorageCredentialsService(AzureStorageCredentialsDetails_v12.createWithAccountKey("Account_Name_A", "core.windows.net", "Account_Key"));
serviceB = new MockAzureStorageCredentialsService(AzureStorageCredentialsDetails_v12.createWithSasToken("Account_Name_B", "core.windows.net", "SAS_Token"));
lookupService = new AzureStorageCredentialsControllerServiceLookup_v12();
runner = TestRunners.newTestRunner(NoOpProcessor.class);
final String serviceAIdentifier = "service-a";
runner.addControllerService(serviceAIdentifier, serviceA);
final String serviceBIdentifier = "service-b";
runner.addControllerService(serviceBIdentifier, serviceB);
runner.addControllerService("lookup-service", lookupService);
runner.setProperty(lookupService, "a", serviceAIdentifier);
runner.setProperty(lookupService, "b", serviceBIdentifier);
runner.enableControllerService(serviceA);
runner.enableControllerService(serviceB);
runner.enableControllerService(lookupService);
}
@Test
public void testLookupServiceA() {
final Map<String,String> attributes = new HashMap<>();
attributes.put(AzureStorageCredentialsControllerServiceLookup_v12.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE, "a");
final AzureStorageCredentialsDetails_v12 storageCredentialsDetails = lookupService.getCredentialsDetails(attributes);
assertNotNull(storageCredentialsDetails);
assertEquals("Account_Name_A", storageCredentialsDetails.getAccountName());
assertEquals("core.windows.net", storageCredentialsDetails.getEndpointSuffix());
assertEquals("Account_Key", storageCredentialsDetails.getAccountKey());
assertNull(storageCredentialsDetails.getSasToken());
}
@Test
public void testLookupServiceB() {
final Map<String, String> attributes = new HashMap<>();
attributes.put(AzureStorageCredentialsControllerServiceLookup_v12.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE, "b");
final AzureStorageCredentialsDetails_v12 storageCredentialsDetails = lookupService.getCredentialsDetails(attributes);
assertNotNull(storageCredentialsDetails);
assertEquals("Account_Name_B", storageCredentialsDetails.getAccountName());
assertEquals("core.windows.net", storageCredentialsDetails.getEndpointSuffix());
assertEquals("SAS_Token", storageCredentialsDetails.getSasToken());
assertNull(storageCredentialsDetails.getAccountKey());
}
@Test
public void testLookupMissingCredentialsNameAttribute() {
final Map<String, String> attributes = new HashMap<>();
assertThrows(ProcessException.class, () -> lookupService.getCredentialsDetails(attributes));
}
@Test
public void testLookupWithCredentialsNameThatDoesNotExist() {
final Map<String, String> attributes = new HashMap<>();
attributes.put(AzureStorageCredentialsControllerServiceLookup_v12.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE, "DOES-NOT-EXIST");
assertThrows(ProcessException.class, () -> lookupService.getCredentialsDetails(attributes));
}
@Test
public void testCustomValidateAtLeaseOneServiceDefined() throws InitializationException {
// enable lookup service with no services registered, verify not valid
runner = TestRunners.newTestRunner(NoOpProcessor.class);
runner.addControllerService("lookup-service", lookupService);
runner.assertNotValid(lookupService);
final String serviceAIdentifier = "service-a";
runner.addControllerService(serviceAIdentifier, serviceA);
// register a service and now verify valid
runner.setProperty(lookupService, "a", serviceAIdentifier);
runner.enableControllerService(lookupService);
runner.assertValid(lookupService);
}
@Test
public void testCustomValidateSelfReferenceNotAllowed() throws InitializationException {
runner = TestRunners.newTestRunner(NoOpProcessor.class);
runner.addControllerService("lookup-service", lookupService);
runner.setProperty(lookupService, "lookup-service", "lookup-service");
runner.assertNotValid(lookupService);
}
/**
* A mock AzureStorageCredentialsService_v12 that will always return the passed in AzureStorageCredentialsDetails_v12.
*/
private static class MockAzureStorageCredentialsService extends AbstractControllerService implements AzureStorageCredentialsService_v12 {
private final AzureStorageCredentialsDetails_v12 storageCredentialsDetails;
MockAzureStorageCredentialsService(AzureStorageCredentialsDetails_v12 storageCredentialsDetails) {
this.storageCredentialsDetails = storageCredentialsDetails;
}
@Override
public AzureStorageCredentialsDetails_v12 getCredentialsDetails(Map<String, String> attributes) {
return storageCredentialsDetails;
}
}
}

View File

@ -23,6 +23,8 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import static org.apache.nifi.processors.azure.AzureServiceEndpoints.DEFAULT_BLOB_ENDPOINT_SUFFIX;
import static org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12.ACCOUNT_NAME;
import static org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12.CREDENTIALS_TYPE;
@ -158,7 +160,7 @@ public class TestAzureStorageCredentialsControllerService_v12 {
runner.enableControllerService(credentialsService);
AzureStorageCredentialsDetails_v12 actual = credentialsService.getCredentialsDetails();
AzureStorageCredentialsDetails_v12 actual = credentialsService.getCredentialsDetails(Collections.emptyMap());
assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName());
assertEquals(DEFAULT_BLOB_ENDPOINT_SUFFIX, actual.getEndpointSuffix());
@ -178,7 +180,7 @@ public class TestAzureStorageCredentialsControllerService_v12 {
runner.enableControllerService(credentialsService);
AzureStorageCredentialsDetails_v12 actual = credentialsService.getCredentialsDetails();
AzureStorageCredentialsDetails_v12 actual = credentialsService.getCredentialsDetails(Collections.emptyMap());
assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName());
assertEquals(DEFAULT_BLOB_ENDPOINT_SUFFIX, actual.getEndpointSuffix());
@ -197,7 +199,7 @@ public class TestAzureStorageCredentialsControllerService_v12 {
runner.enableControllerService(credentialsService);
AzureStorageCredentialsDetails_v12 actual = credentialsService.getCredentialsDetails();
AzureStorageCredentialsDetails_v12 actual = credentialsService.getCredentialsDetails(Collections.emptyMap());
assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName());
assertEquals(DEFAULT_BLOB_ENDPOINT_SUFFIX, actual.getEndpointSuffix());
@ -219,7 +221,7 @@ public class TestAzureStorageCredentialsControllerService_v12 {
runner.enableControllerService(credentialsService);
AzureStorageCredentialsDetails_v12 actual = credentialsService.getCredentialsDetails();
AzureStorageCredentialsDetails_v12 actual = credentialsService.getCredentialsDetails(Collections.emptyMap());
assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName());
assertEquals(DEFAULT_BLOB_ENDPOINT_SUFFIX, actual.getEndpointSuffix());
@ -240,7 +242,7 @@ public class TestAzureStorageCredentialsControllerService_v12 {
runner.enableControllerService(credentialsService);
AzureStorageCredentialsDetails_v12 actual = credentialsService.getCredentialsDetails();
AzureStorageCredentialsDetails_v12 actual = credentialsService.getCredentialsDetails(Collections.emptyMap());
assertEquals(ENDPOINT_SUFFIX_VALUE, actual.getEndpointSuffix());
}

View File

@ -18,6 +18,8 @@ package org.apache.nifi.services.azure.storage;
import com.azure.core.credential.AccessToken;
import java.util.Objects;
public class AzureStorageCredentialsDetails_v12 {
private final String accountName;
@ -86,6 +88,45 @@ public class AzureStorageCredentialsDetails_v12 {
return accessToken;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AzureStorageCredentialsDetails_v12 that = (AzureStorageCredentialsDetails_v12) o;
return credentialsType == that.credentialsType
&& Objects.equals(accountName, that.accountName)
&& Objects.equals(endpointSuffix, that.endpointSuffix)
&& Objects.equals(accountKey, that.accountKey)
&& Objects.equals(sasToken, that.sasToken)
&& Objects.equals(managedIdentityClientId, that.managedIdentityClientId)
&& Objects.equals(servicePrincipalTenantId, that.servicePrincipalTenantId)
&& Objects.equals(servicePrincipalClientId, that.servicePrincipalClientId)
&& Objects.equals(servicePrincipalClientSecret, that.servicePrincipalClientSecret)
&& Objects.equals(accessToken, that.accessToken);
}
@Override
public int hashCode() {
return Objects.hash(
credentialsType,
accountName,
endpointSuffix,
accountKey,
sasToken,
managedIdentityClientId,
servicePrincipalTenantId,
servicePrincipalClientId,
servicePrincipalClientSecret,
accessToken
);
}
public static AzureStorageCredentialsDetails_v12 createWithAccountKey(
String accountName,
String endpointSuffix,

View File

@ -18,6 +18,8 @@ package org.apache.nifi.services.azure.storage;
import org.apache.nifi.controller.ControllerService;
import java.util.Map;
/**
* Service interface to provide Azure credentials details for processors using Azure Storage Java v12 client library.
*/
@ -25,7 +27,8 @@ public interface AzureStorageCredentialsService_v12 extends ControllerService {
/**
* Get AzureStorageCredentialsDetails_v12 object which contains the Storage Account Name, the Storage Service Endpoint Suffix and the parameters of the Storage Credentials
* @param attributes FlowFile attributes (typically)
* @return AzureStorageCredentialsDetails_v12 object
*/
AzureStorageCredentialsDetails_v12 getCredentialsDetails();
AzureStorageCredentialsDetails_v12 getCredentialsDetails(Map<String, String> attributes);
}