NIFI-11439 Added Storage API URL property to GCS Processors

- Included Host Header override with Storage API URL based on Google Private Service Connect documentation

This closes #7172

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Paul Grey 2023-04-12 23:44:41 -04:00 committed by exceptionfactory
parent bc5f00a667
commit ee24df2830
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 43 additions and 1 deletions

View File

@ -17,18 +17,22 @@
package org.apache.nifi.processors.gcp.storage;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.BaseServiceException;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.collect.ImmutableMap;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.proxy.ProxyConfiguration;
@ -65,9 +69,21 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
return relationships;
}
// https://cloud.google.com/storage/docs/request-endpoints#storage-set-client-endpoint-java
public static final PropertyDescriptor STORAGE_API_URL = new PropertyDescriptor
.Builder().name("storage-api-url")
.displayName("Storage API URL")
.description("Overrides the default storage URL. Configuring an alternative Storage API URL also overrides the HTTP Host header on requests as described in the Google documentation for Private Service Connections.")
.addValidator(StandardValidators.URL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
.build();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.unmodifiableList(super.getSupportedPropertyDescriptors());
final List<PropertyDescriptor> propertyDescriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
propertyDescriptors.add(STORAGE_API_URL);
return Collections.unmodifiableList(propertyDescriptors);
}
@Override
@ -129,6 +145,7 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
@Override
protected StorageOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
final String storageApiUrl = context.getProperty(STORAGE_API_URL).evaluateAttributeExpressions().getValue();
final Integer retryCount = context.getProperty(RETRY_COUNT).asInteger();
StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder()
@ -141,6 +158,12 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
storageOptionsBuilder.setProjectId(projectId);
}
if (storageApiUrl != null && !storageApiUrl.isEmpty()) {
storageOptionsBuilder.setHost(storageApiUrl);
// https://codelabs.developers.google.com/cloudnet-psc#12
storageOptionsBuilder.setHeaderProvider(FixedHeaderProvider.create(ImmutableMap.of("Host", "www.googleapis.com")));
}
return storageOptionsBuilder.setTransportOptions(getTransportOptions(context)).build();
}
}

View File

@ -45,6 +45,7 @@ import static org.mockito.Mockito.reset;
@ExtendWith(MockitoExtension.class)
public abstract class AbstractGCSTest {
private static final String PROJECT_ID = System.getProperty("test.gcp.project.id", "nifi-test-gcp-project");
private static final String DEFAULT_STORAGE_URL = "https://storage.googleapis.com";
private static final Integer RETRIES = 9;
static final String BUCKET = RemoteStorageHelper.generateBucketName();
@ -89,9 +90,27 @@ public abstract class AbstractGCSTest {
mockCredentials);
assertEquals(PROJECT_ID, options.getProjectId(), "Project IDs should match");
assertEquals(DEFAULT_STORAGE_URL, options.getHost(), "Host URLs should match");
assertEquals(RETRIES.intValue(), options.getRetrySettings().getMaxAttempts(), "Retry counts should match");
assertSame(mockCredentials, options.getCredentials(), "Credentials should be configured correctly");
}
@Test
public void testStorageOptionsConfigurationHostOverride() throws Exception {
reset(storage);
final TestRunner runner = buildNewRunner(getProcessor());
final String overrideStorageApiUrl = "https://localhost";
runner.setProperty(AbstractGCSProcessor.STORAGE_API_URL, overrideStorageApiUrl);
final AbstractGCSProcessor processor = getProcessor();
final GoogleCredentials mockCredentials = mock(GoogleCredentials.class);
final StorageOptions options = processor.getServiceOptions(runner.getProcessContext(),
mockCredentials);
assertEquals(overrideStorageApiUrl, options.getHost(), "Host URLs should match");
}
}