NIFI-12334 Added GCSFileResourceService

This closes #7999

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Bathori 2023-11-08 15:32:19 +01:00 committed by exceptionfactory
parent 30419c8dd8
commit 69df3f0f66
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
6 changed files with 407 additions and 63 deletions

View File

@ -69,6 +69,10 @@
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-file-resource-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-conflict-resolution</artifactId>

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.storage;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.ReadChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.documentation.UseCase;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.fileresource.service.api.FileResourceService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.util.GoogleUtils;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_DESC;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.KEY_DESC;
@Tags({"file", "resource", "gcs"})
@SeeAlso({FetchGCSObject.class})
@CapabilityDescription("Provides a Google Compute Storage (GCS) file resource for other components.")
@UseCase(
description = "Fetch a specific file from GCS." +
" The service provides higher performance compared to fetch processors when the data should be moved between different storages without any transformation.",
configuration = """
"Bucket" = "${gcs.bucket}"
"Name" = "${filename}"
The "GCP Credentials Provider Service" property should specify an instance of the GCPCredentialsService in order to provide credentials for accessing the bucket.
"""
)
public class GCSFileResourceService extends AbstractControllerService implements FileResourceService {
public static final PropertyDescriptor BUCKET = new PropertyDescriptor
.Builder()
.name("Bucket")
.displayName("Bucket")
.description(BUCKET_DESC)
.required(true)
.defaultValue("${" + BUCKET_ATTR + "}")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor KEY = new PropertyDescriptor
.Builder()
.name("Name")
.displayName("Name")
.description(KEY_DESC)
.required(true)
.defaultValue("${" + CoreAttributes.FILENAME.key() + "}")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
BUCKET,
KEY,
GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE
);
private volatile PropertyContext context;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
this.context = context;
}
@OnDisabled
public void onDisabled() {
this.context = null;
}
@Override
public FileResource getFileResource(Map<String, String> attributes) {
final GCPCredentialsService gcpCredentialsService = context.getProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE).asControllerService(GCPCredentialsService.class);
final Storage storage = getCloudService(gcpCredentialsService.getGoogleCredentials());
try {
return fetchBlob(storage, attributes);
} catch (final StorageException | IOException e) {
throw new ProcessException("Failed to fetch GCS Object", e);
}
}
protected Storage getCloudService(GoogleCredentials credentials) {
final StorageOptions storageOptions = StorageOptions.newBuilder()
.setCredentials(credentials)
.build();
return storageOptions.getService();
}
/**
* Fetching blob from the provided bucket.
*
* @param storage gcs storage
* @param attributes configuration attributes
* @return fetched blob as FileResource
* @throws IOException exception caused by missing parameters
*/
private FileResource fetchBlob(final Storage storage, final Map<String, String> attributes) throws IOException {
final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
final BlobId blobId = BlobId.of(bucketName, key);
if (blobId.getName() == null || blobId.getName().isEmpty()) {
throw new IllegalArgumentException("Blob Name is required");
}
final Blob blob = storage.get(blobId);
if (blob == null) {
throw new StorageException(404, "Blob " + blobId + " not found");
}
final ReadChannel reader = storage.reader(blob.getBlobId());
return new FileResource(Channels.newInputStream(reader), blob.getSize());
}
}

View File

@ -12,4 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService
org.apache.nifi.processors.gcp.storage.GCSFileResourceService

View File

@ -16,8 +16,6 @@
*/
package org.apache.nifi.processors.gcp.storage;
import com.google.cloud.ReadChannel;
import com.google.cloud.RestorableState;
import com.google.cloud.storage.Acl;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
@ -25,8 +23,6 @@ import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
@ -35,6 +31,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.gcp.util.MockReadChannel;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.AfterEach;
@ -130,7 +127,6 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
}
}
@Override
public AbstractGCSProcessor getProcessor() {
return new FetchGCSObject() {
@ -146,65 +142,10 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
};
}
private static class MockReadChannel implements ReadChannel {
private final byte[] toRead;
private int position = 0;
private boolean finished;
private boolean isOpen;
private MockReadChannel(String textToRead) {
this.toRead = textToRead.getBytes();
this.isOpen = true;
this.finished = false;
}
@Override
public void close() {
this.isOpen = false;
}
@Override
public void seek(long l) throws IOException {
}
@Override
public void setChunkSize(int i) {
}
@Override
public RestorableState<ReadChannel> capture() {
return null;
}
@Override
public int read(ByteBuffer dst) throws IOException {
if (this.finished) {
return -1;
} else {
if (dst.remaining() > this.toRead.length) {
this.finished = true;
}
int toWrite = Math.min(this.toRead.length - position, dst.remaining());
dst.put(this.toRead, this.position, toWrite);
this.position += toWrite;
return toWrite;
}
}
@Override
public boolean isOpen() {
return this.isOpen;
}
}
@Override
protected void addRequiredPropertiesToRunner(TestRunner runner) {
runner.setProperty(FetchGCSObject.BUCKET, BUCKET);
runner.setProperty(FetchGCSObject.KEY, String.valueOf(KEY));
runner.setProperty(FetchGCSObject.KEY, KEY);
}
@Test
@ -249,7 +190,6 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
when(storage.get(any(BlobId.class))).thenReturn(blob);
when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption[].class))).thenReturn(new MockReadChannel(CONTENT));
runner.enqueue("");
runner.run();

View File

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.storage;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.testing.RemoteStorageHelper;
import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.processors.gcp.util.MockReadChannel;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class GCSFileResourceServiceTest {
private static final String TEST_NAME = GCSFileResourceServiceTest.class.getSimpleName();
private static final String CONTROLLER_SERVICE = "GCPCredentialsService";
private static final String BUCKET = RemoteStorageHelper.generateBucketName();
private static final String KEY = "test-file";
private static final BlobId BLOB_ID = BlobId.of(BUCKET, KEY);
private static final String TEST_DATA = "test-data";
@Mock
Storage storage;
private TestRunner runner;
private TestGCSFileResourceService service;
@BeforeEach
void setup() throws InitializationException {
service = new TestGCSFileResourceService(storage);
runner = TestRunners.newTestRunner(NoOpProcessor.class);
runner.addControllerService(TEST_NAME, service);
}
@Test
void testValidBlob(@Mock Blob blob) throws InitializationException, IOException {
when(blob.getBlobId()).thenReturn(BLOB_ID);
when(blob.getSize()).thenReturn((long) TEST_DATA.length());
when(storage.get(BLOB_ID)).thenReturn(blob);
when(storage.reader(BLOB_ID)).thenReturn(new MockReadChannel(TEST_DATA));
setUpService(KEY, BUCKET);
final FileResource fileResource = service.getFileResource(Collections.emptyMap());
assertFileResource(fileResource);
}
@Test
void testValidBlobUsingEL(@Mock Blob blob) throws IOException, InitializationException {
when(blob.getBlobId()).thenReturn(BLOB_ID);
when(blob.getSize()).thenReturn((long) TEST_DATA.length());
when(storage.get(BLOB_ID)).thenReturn(blob);
when(storage.reader(BLOB_ID)).thenReturn(new MockReadChannel(TEST_DATA));
final Map<String, String> attributes = setUpServiceWithEL(KEY, BUCKET);
final FileResource fileResource = service.getFileResource(attributes);
assertFileResource(fileResource);
}
@Test
void testValidBlobUsingELButMissingAttribute() throws InitializationException {
runner.setValidateExpressionUsage(false);
setUpServiceWithEL(KEY, BUCKET);
assertThrows(IllegalArgumentException.class, () -> service.getFileResource(Collections.emptyMap()));
}
@Test
void testNonExistingBlob() throws InitializationException {
final Map<String, String> attributes = setUpServiceWithEL("invalid-key", "invalid-bucket");
assertThrows(ProcessException.class, () -> service.getFileResource(attributes));
}
private void setUpService(String key, String bucket) throws InitializationException {
final GCPCredentialsService credentialsService = new GCPCredentialsControllerService();
runner.addControllerService(CONTROLLER_SERVICE, credentialsService);
runner.enableControllerService(credentialsService);
runner.setProperty(service, GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(service, GCSFileResourceService.KEY, key);
runner.setProperty(service, GCSFileResourceService.BUCKET, bucket);
runner.enableControllerService(service);
}
private Map<String, String> setUpServiceWithEL(String key, String bucket) throws InitializationException {
final String keyAttribute = "file.key";
final String bucketAttribute = "file.bucket";
Map<String, String> attributes = new HashMap<>();
attributes.put(keyAttribute, key);
attributes.put(bucketAttribute, bucket);
setUpService(String.format("${%s}", keyAttribute), String.format("${%s}", bucketAttribute));
return attributes;
}
private void assertFileResource(FileResource fileResource) throws IOException {
assertNotNull(fileResource);
assertEquals(TEST_DATA.length(), fileResource.getSize());
try (final InputStream inputStream = fileResource.getInputStream()) {
assertArrayEquals(TEST_DATA.getBytes(), inputStream.readAllBytes());
}
}
private static class TestGCSFileResourceService extends GCSFileResourceService {
private final Storage storage;
public TestGCSFileResourceService(Storage storage) {
this.storage = storage;
}
@Override
protected Storage getCloudService(GoogleCredentials credentials) {
return storage;
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.util;
import com.google.cloud.ReadChannel;
import com.google.cloud.RestorableState;
import java.nio.ByteBuffer;
public class MockReadChannel implements ReadChannel {
private final byte[] toRead;
private int position = 0;
private boolean finished;
private boolean isOpen;
public MockReadChannel(String textToRead) {
this.toRead = textToRead.getBytes();
this.isOpen = true;
this.finished = false;
}
@Override
public void close() {
this.isOpen = false;
}
@Override
public void seek(long l) {}
@Override
public void setChunkSize(int i) {}
@Override
public RestorableState<ReadChannel> capture() {
return null;
}
@Override
public int read(ByteBuffer dst) {
if (this.finished) {
return -1;
} else {
if (dst.remaining() > this.toRead.length) {
this.finished = true;
}
int toWrite = Math.min(this.toRead.length - position, dst.remaining());
dst.put(this.toRead, this.position, toWrite);
this.position += toWrite;
return toWrite;
}
}
@Override
public boolean isOpen() {
return this.isOpen;
}
}