NIFI-11439 Corrected GCS Transit URI for custom Storage API URL

This closes #7173

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Paul Grey 2023-04-13 18:36:31 -04:00 committed by exceptionfactory
parent cd685671c8
commit cff7808543
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
5 changed files with 67 additions and 18 deletions

View File

@ -37,6 +37,7 @@ import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.proxy.ProxyConfiguration;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -167,4 +168,9 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
return storageOptionsBuilder.setTransportOptions(getTransportOptions(context)).build();
}
protected String getTransitUri(final String storageApiUrl, final String bucketName, final String key) {
final URI apiUri = URI.create(storageApiUrl);
return String.format("%s://%s.%s/%s", apiUri.getScheme(), bucketName, apiUri.getHost(), key);
}
}

View File

@ -273,7 +273,9 @@ public class FetchGCSObject extends AbstractGCSProcessor {
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully retrieved GCS Object for {} in {} millis; routing to success", new Object[]{flowFile, millis});
session.getProvenanceReporter().fetch(flowFile, "https://" + bucketName + ".storage.googleapis.com/" + key, millis);
final String transitUri = getTransitUri(storage.getOptions().getHost(), bucketName, key);
session.getProvenanceReporter().fetch(flowFile, transitUri, millis);
}
private FetchedBlob fetchBlob(final ProcessContext context, final Storage storage, final Map<String, String> attributes) throws IOException {

View File

@ -542,9 +542,9 @@ public class PutGCSObject extends AbstractGCSProcessor {
}
session.transfer(flowFile, REL_SUCCESS);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
final String url = "https://" + bucket + ".storage.googleapis.com/" + key;
session.getProvenanceReporter().send(flowFile, url, millis);
final String transitUri = getTransitUri(storage.getOptions().getHost(), bucket, key);
session.getProvenanceReporter().send(flowFile, transitUri, millis);
getLogger().info("Successfully put {} to Google Cloud Storage in {} milliseconds",
new Object[]{ff, millis});

View File

@ -24,6 +24,7 @@ import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
@ -102,10 +103,14 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
private static final String OWNER_DOMAIN = "test-owner-domain";
private static final String OWNER_PROJECT_ID = "test-owner-project-id";
private static final String URI = "test-uri";
private static final String STORAGE_API_URL = "https://localhost";
private static final String CONTENT_DISPOSITION = "attachment; filename=\"test-content-disposition.txt\"";
private static final Long CREATE_TIME = 1234L;
private static final Long UPDATE_TIME = 4567L;
@Mock
StorageOptions storageOptions;
@Mock
Storage storage;
@ -192,7 +197,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
@Test
public void testSuccessfulFetch() throws Exception {
reset(storage);
reset(storageOptions, storage);
when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
when(storage.getOptions()).thenReturn(storageOptions);
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);
runner.assertValid();
@ -347,7 +354,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
@Test
public void testAclOwnerUser() throws Exception {
reset(storage);
reset(storageOptions, storage);
when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
when(storage.getOptions()).thenReturn(storageOptions);
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);
runner.assertValid();
@ -387,7 +396,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
@Test
public void testAclOwnerGroup() throws Exception {
reset(storage);
reset(storageOptions, storage);
when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
when(storage.getOptions()).thenReturn(storageOptions);
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);
runner.assertValid();
@ -429,7 +440,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
@Test
public void testAclOwnerDomain() throws Exception {
reset(storage);
reset(storageOptions, storage);
when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
when(storage.getOptions()).thenReturn(storageOptions);
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);
runner.assertValid();
@ -471,7 +484,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
@Test
public void testAclOwnerProject() throws Exception {
reset(storage);
reset(storageOptions, storage);
when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
when(storage.getOptions()).thenReturn(storageOptions);
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);
runner.assertValid();
@ -510,7 +525,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
@Test
public void testBlobIdWithGeneration() throws Exception {
reset(storage);
reset(storageOptions, storage);
when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
when(storage.getOptions()).thenReturn(storageOptions);
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);
@ -568,7 +585,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
@Test
public void testBlobIdWithEncryption() throws Exception {
reset(storage);
reset(storageOptions, storage);
when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
when(storage.getOptions()).thenReturn(storageOptions);
final TestRunner runner = buildNewRunner(getProcessor());
runner.setProperty(FetchGCSObject.ENCRYPTION_KEY, ENCRYPTION_SHA256);

View File

@ -21,6 +21,7 @@ import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -99,11 +100,15 @@ public class PutGCSObjectTest extends AbstractGCSTest {
private static final String OWNER_DOMAIN = "test-owner-domain";
private static final String OWNER_PROJECT_ID = "test-owner-project-id";
private static final String URI = "test-uri";
private static final String STORAGE_API_URL = "https://localhost";
private static final String CONTENT_DISPOSITION = "attachment; filename=\"" + FILENAME + "\"";
private static final Long CREATE_TIME = 1234L;
private static final Long UPDATE_TIME = 4567L;
private final static Long GENERATION = 5L;
@Mock
StorageOptions storageOptions;
@Mock
Storage storage;
@ -136,7 +141,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
@Test
public void testSuccessfulPutOperationNoParameters() throws Exception {
reset(storage, blob);
reset(storageOptions, storage, blob);
when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
when(storage.getOptions()).thenReturn(storageOptions);
final PutGCSObject processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -167,7 +174,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
@Test
public void testSuccessfulPutOperation() throws Exception {
reset(storage, blob);
reset(storageOptions, storage, blob);
when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
when(storage.getOptions()).thenReturn(storageOptions);
final PutGCSObject processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -245,7 +254,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
@Test
public void testSuccessfulPutOperationWithUserMetadata() throws Exception {
reset(storage, blob);
reset(storageOptions, storage, blob);
when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
when(storage.getOptions()).thenReturn(storageOptions);
final PutGCSObject processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -292,7 +303,10 @@ public class PutGCSObjectTest extends AbstractGCSTest {
@Test
public void testAttributesSetOnSuccessfulPut() throws Exception {
reset(storage, blob);
reset(storageOptions, storage, blob);
when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
when(storage.getOptions()).thenReturn(storageOptions);
final PutGCSObject processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -360,7 +374,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
@Test
public void testAclAttributeUser() throws Exception {
reset(storage, blob);
reset(storageOptions, storage, blob);
when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
when(storage.getOptions()).thenReturn(storageOptions);
final PutGCSObject processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -386,7 +402,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
@Test
public void testAclAttributeGroup() throws Exception {
reset(storage, blob);
reset(storageOptions, storage, blob);
when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
when(storage.getOptions()).thenReturn(storageOptions);
final PutGCSObject processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -413,7 +431,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
@Test
public void testAclAttributeDomain() throws Exception {
reset(storage, blob);
reset(storageOptions, storage, blob);
when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
when(storage.getOptions()).thenReturn(storageOptions);
final PutGCSObject processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -440,7 +460,9 @@ public class PutGCSObjectTest extends AbstractGCSTest {
@Test
public void testAclAttributeProject() throws Exception {
reset(storage, blob);
reset(storageOptions, storage, blob);
when(storageOptions.getHost()).thenReturn(STORAGE_API_URL);
when(storage.getOptions()).thenReturn(storageOptions);
final PutGCSObject processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);