diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml index 36e6b7c4a1..70f104365e 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml @@ -69,6 +69,10 @@ 2.0.0-SNAPSHOT provided + + org.apache.nifi + nifi-file-resource-service-api + org.apache.nifi nifi-conflict-resolution diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/GCSFileResourceService.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/GCSFileResourceService.java new file mode 100644 index 0000000000..0f99897fcd --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/GCSFileResourceService.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.storage; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.ReadChannel; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.StorageOptions; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.fileresource.service.api.FileResourceService; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.gcp.credentials.service.GCPCredentialsService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.gcp.util.GoogleUtils; + +import java.io.IOException; +import java.nio.channels.Channels; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_DESC; +import static org.apache.nifi.processors.gcp.storage.StorageAttributes.KEY_DESC; + +@Tags({"file", "resource", "gcs"}) +@SeeAlso({FetchGCSObject.class}) +@CapabilityDescription("Provides a Google Compute Storage (GCS) file resource for other components.") +@UseCase( + description = "Fetch a specific file from GCS." + + " The service provides higher performance compared to fetch processors when the data should be moved between different storages without any transformation.", + configuration = """ + "Bucket" = "${gcs.bucket}" + "Name" = "${filename}" + + The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the bucket. + """ +) +public class GCSFileResourceService extends AbstractControllerService implements FileResourceService { + + public static final PropertyDescriptor BUCKET = new PropertyDescriptor + .Builder() + .name("Bucket") + .displayName("Bucket") + .description(BUCKET_DESC) + .required(true) + .defaultValue("${" + BUCKET_ATTR + "}") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor KEY = new PropertyDescriptor + .Builder() + .name("Name") + .displayName("Name") + .description(KEY_DESC) + .required(true) + .defaultValue("${" + CoreAttributes.FILENAME.key() + "}") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final List PROPERTIES = Arrays.asList( + BUCKET, + KEY, + GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE + ); + + private volatile PropertyContext context; + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.context = context; + } + + @OnDisabled + public void onDisabled() { + this.context = null; + } + + @Override + public FileResource getFileResource(Map attributes) { + final GCPCredentialsService gcpCredentialsService = context.getProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE).asControllerService(GCPCredentialsService.class); + final Storage storage = getCloudService(gcpCredentialsService.getGoogleCredentials()); + + try { + return fetchBlob(storage, attributes); + } catch (final StorageException | IOException e) { + throw new ProcessException("Failed to fetch GCS Object", e); + } + } + + protected Storage getCloudService(GoogleCredentials credentials) { + final StorageOptions storageOptions = StorageOptions.newBuilder() + .setCredentials(credentials) + .build(); + + return storageOptions.getService(); + } + + /** + * Fetching blob from the provided bucket. + * + * @param storage gcs storage + * @param attributes configuration attributes + * @return fetched blob as FileResource + * @throws IOException exception caused by missing parameters + */ + private FileResource fetchBlob(final Storage storage, final Map attributes) throws IOException { + final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue(); + + final BlobId blobId = BlobId.of(bucketName, key); + if (blobId.getName() == null || blobId.getName().isEmpty()) { + throw new IllegalArgumentException("Blob Name is required"); + } + + final Blob blob = storage.get(blobId); + if (blob == null) { + throw new StorageException(404, "Blob " + blobId + " not found"); + } + + final ReadChannel reader = storage.reader(blob.getBlobId()); + return new FileResource(Channels.newInputStream(reader), blob.getSize()); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index ee70d2fbef..e5c9970ebc 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -12,4 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService \ No newline at end of file +org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService +org.apache.nifi.processors.gcp.storage.GCSFileResourceService \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java index bd66cab655..91e14aa772 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java @@ -16,8 +16,6 @@ */ package org.apache.nifi.processors.gcp.storage; -import com.google.cloud.ReadChannel; -import com.google.cloud.RestorableState; import com.google.cloud.storage.Acl; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; @@ -25,8 +23,6 @@ import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; -import java.io.IOException; -import java.nio.ByteBuffer; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.HashMap; @@ -35,6 +31,7 @@ import java.util.Map; import java.util.Set; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.gcp.util.MockReadChannel; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.junit.jupiter.api.AfterEach; @@ -130,7 +127,6 @@ public class FetchGCSObjectTest extends AbstractGCSTest { } } - @Override public AbstractGCSProcessor getProcessor() { return new FetchGCSObject() { @@ -146,65 +142,10 @@ public class FetchGCSObjectTest extends AbstractGCSTest { }; } - private static class MockReadChannel implements ReadChannel { - private final byte[] toRead; - private int position = 0; - private boolean finished; - private boolean isOpen; - - private MockReadChannel(String textToRead) { - this.toRead = textToRead.getBytes(); - this.isOpen = true; - this.finished = false; - } - - @Override - public void close() { - this.isOpen = false; - } - - @Override - public void seek(long l) throws IOException { - - } - - @Override - public void setChunkSize(int i) { - - } - - @Override - public RestorableState capture() { - return null; - } - - @Override - public int read(ByteBuffer dst) throws IOException { - if (this.finished) { - return -1; - } else { - if (dst.remaining() > this.toRead.length) { - this.finished = true; - } - int toWrite = Math.min(this.toRead.length - position, dst.remaining()); - - dst.put(this.toRead, this.position, toWrite); - this.position += toWrite; - - return toWrite; - } - } - - @Override - public boolean isOpen() { - return this.isOpen; - } - } - @Override protected void addRequiredPropertiesToRunner(TestRunner runner) { runner.setProperty(FetchGCSObject.BUCKET, BUCKET); - runner.setProperty(FetchGCSObject.KEY, String.valueOf(KEY)); + runner.setProperty(FetchGCSObject.KEY, KEY); } @Test @@ -249,7 +190,6 @@ public class FetchGCSObjectTest extends AbstractGCSTest { when(storage.get(any(BlobId.class))).thenReturn(blob); when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption[].class))).thenReturn(new MockReadChannel(CONTENT)); - runner.enqueue(""); runner.run(); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/GCSFileResourceServiceTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/GCSFileResourceServiceTest.java new file mode 100644 index 0000000000..0396569dcd --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/GCSFileResourceServiceTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.storage; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.testing.RemoteStorageHelper; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.gcp.credentials.service.GCPCredentialsService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; +import org.apache.nifi.processors.gcp.util.MockReadChannel; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class GCSFileResourceServiceTest { + + private static final String TEST_NAME = GCSFileResourceServiceTest.class.getSimpleName(); + private static final String CONTROLLER_SERVICE = "GCPCredentialsService"; + private static final String BUCKET = RemoteStorageHelper.generateBucketName(); + private static final String KEY = "test-file"; + private static final BlobId BLOB_ID = BlobId.of(BUCKET, KEY); + private static final String TEST_DATA = "test-data"; + + @Mock + Storage storage; + + private TestRunner runner; + private TestGCSFileResourceService service; + + @BeforeEach + void setup() throws InitializationException { + service = new TestGCSFileResourceService(storage); + runner = TestRunners.newTestRunner(NoOpProcessor.class); + runner.addControllerService(TEST_NAME, service); + } + + @Test + void testValidBlob(@Mock Blob blob) throws InitializationException, IOException { + when(blob.getBlobId()).thenReturn(BLOB_ID); + when(blob.getSize()).thenReturn((long) TEST_DATA.length()); + when(storage.get(BLOB_ID)).thenReturn(blob); + when(storage.reader(BLOB_ID)).thenReturn(new MockReadChannel(TEST_DATA)); + + setUpService(KEY, BUCKET); + + final FileResource fileResource = service.getFileResource(Collections.emptyMap()); + + assertFileResource(fileResource); + } + + @Test + void testValidBlobUsingEL(@Mock Blob blob) throws IOException, InitializationException { + when(blob.getBlobId()).thenReturn(BLOB_ID); + when(blob.getSize()).thenReturn((long) TEST_DATA.length()); + when(storage.get(BLOB_ID)).thenReturn(blob); + when(storage.reader(BLOB_ID)).thenReturn(new MockReadChannel(TEST_DATA)); + + final Map attributes = setUpServiceWithEL(KEY, BUCKET); + + final FileResource fileResource = service.getFileResource(attributes); + + assertFileResource(fileResource); + } + + @Test + void testValidBlobUsingELButMissingAttribute() throws InitializationException { + runner.setValidateExpressionUsage(false); + + setUpServiceWithEL(KEY, BUCKET); + + assertThrows(IllegalArgumentException.class, () -> service.getFileResource(Collections.emptyMap())); + } + + @Test + void testNonExistingBlob() throws InitializationException { + final Map attributes = setUpServiceWithEL("invalid-key", "invalid-bucket"); + + assertThrows(ProcessException.class, () -> service.getFileResource(attributes)); + } + + private void setUpService(String key, String bucket) throws InitializationException { + final GCPCredentialsService credentialsService = new GCPCredentialsControllerService(); + + runner.addControllerService(CONTROLLER_SERVICE, credentialsService); + runner.enableControllerService(credentialsService); + + runner.setProperty(service, GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); + runner.setProperty(service, GCSFileResourceService.KEY, key); + runner.setProperty(service, GCSFileResourceService.BUCKET, bucket); + + runner.enableControllerService(service); + } + + private Map setUpServiceWithEL(String key, String bucket) throws InitializationException { + final String keyAttribute = "file.key"; + final String bucketAttribute = "file.bucket"; + + Map attributes = new HashMap<>(); + attributes.put(keyAttribute, key); + attributes.put(bucketAttribute, bucket); + + setUpService(String.format("${%s}", keyAttribute), String.format("${%s}", bucketAttribute)); + + return attributes; + } + + private void assertFileResource(FileResource fileResource) throws IOException { + assertNotNull(fileResource); + assertEquals(TEST_DATA.length(), fileResource.getSize()); + try (final InputStream inputStream = fileResource.getInputStream()) { + assertArrayEquals(TEST_DATA.getBytes(), inputStream.readAllBytes()); + } + } + + private static class TestGCSFileResourceService extends GCSFileResourceService { + + private final Storage storage; + + public TestGCSFileResourceService(Storage storage) { + this.storage = storage; + } + + @Override + protected Storage getCloudService(GoogleCredentials credentials) { + return storage; + } + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/util/MockReadChannel.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/util/MockReadChannel.java new file mode 100644 index 0000000000..e1a6e3436a --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/util/MockReadChannel.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.util; + +import com.google.cloud.ReadChannel; +import com.google.cloud.RestorableState; + +import java.nio.ByteBuffer; + +public class MockReadChannel implements ReadChannel { + private final byte[] toRead; + private int position = 0; + private boolean finished; + private boolean isOpen; + + public MockReadChannel(String textToRead) { + this.toRead = textToRead.getBytes(); + this.isOpen = true; + this.finished = false; + } + + @Override + public void close() { + this.isOpen = false; + } + + @Override + public void seek(long l) {} + + @Override + public void setChunkSize(int i) {} + + @Override + public RestorableState capture() { + return null; + } + + @Override + public int read(ByteBuffer dst) { + if (this.finished) { + return -1; + } else { + if (dst.remaining() > this.toRead.length) { + this.finished = true; + } + int toWrite = Math.min(this.toRead.length - position, dst.remaining()); + + dst.put(this.toRead, this.position, toWrite); + this.position += toWrite; + + return toWrite; + } + } + + @Override + public boolean isOpen() { + return this.isOpen; + } +}