mirror of https://github.com/apache/nifi.git
NIFI-11266 PutGoogleDrive, ListGoogleDrive, FetchGoogleDrive can't access a SharedDrive
This closes #7058 Reviewed-by: Mark Bathori <bathori.mark@gmail.com> Signed-off-by: Nandor Soma Abonyi <nsabonyi@apache.org>
This commit is contained in:
parent
1a9acb1f4b
commit
fe2721786c
|
@ -168,6 +168,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
|
|||
try (final InputStream driveFileInputStream = driveService
|
||||
.files()
|
||||
.get(fileId)
|
||||
.setSupportsAllDrives(true)
|
||||
.executeMediaAsInputStream()) {
|
||||
|
||||
return session.importFrom(driveFileInputStream, flowFile);
|
||||
|
@ -178,6 +179,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
|
|||
return driveService
|
||||
.files()
|
||||
.get(fileId)
|
||||
.setSupportsAllDrives(true)
|
||||
.setFields("id, name, createdTime, mimeType, size")
|
||||
.execute();
|
||||
}
|
||||
|
|
|
@ -78,8 +78,8 @@ import org.apache.nifi.serialization.record.RecordSchema;
|
|||
@TriggerSerially
|
||||
@Tags({"google", "drive", "storage"})
|
||||
@CapabilityDescription("Lists concrete files (shortcuts are ignored) in a Google Drive folder. " +
|
||||
"Each listed file may result in one flowfile, the metadata being written as flowfile attributes. " +
|
||||
"Or - in case the 'Record Writer' property is set - the entire result is written as records to a single flowfile. " +
|
||||
"Each listed file may result in one FlowFile, the metadata being written as FlowFile attributes. " +
|
||||
"Or - in case the 'Record Writer' property is set - the entire result is written as records to a single FlowFile. " +
|
||||
"This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the " +
|
||||
"previous node left off without duplicating all of the data. " +
|
||||
"Please see Additional Details to set up access to Google Drive.")
|
||||
|
@ -266,6 +266,8 @@ public class ListGoogleDrive extends AbstractListProcessor<GoogleDriveFileInfo>
|
|||
do {
|
||||
FileList result = driveService.files()
|
||||
.list()
|
||||
.setSupportsAllDrives(true)
|
||||
.setIncludeItemsFromAllDrives(true)
|
||||
.setQ(queryBuilder.toString())
|
||||
.setPageToken(pageToken)
|
||||
.setFields("nextPageToken, files(id, name, size, createdTime, modifiedTime, mimeType)")
|
||||
|
@ -329,6 +331,8 @@ public class ListGoogleDrive extends AbstractListProcessor<GoogleDriveFileInfo>
|
|||
do {
|
||||
FileList directoryList = service.files()
|
||||
.list()
|
||||
.setSupportsAllDrives(true)
|
||||
.setIncludeItemsFromAllDrives(true)
|
||||
.setQ("'" + folderId + "' in parents "
|
||||
+ "and mimeType = 'application/vnd.google-apps.folder'"
|
||||
)
|
||||
|
|
|
@ -187,7 +187,7 @@ public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrai
|
|||
REL_FAILURE
|
||||
)));
|
||||
|
||||
public static final String MULTIPART_UPLOAD_URL = "https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart";
|
||||
public static final String MULTIPART_UPLOAD_URL = "https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart&supportsAllDrives=true";
|
||||
|
||||
private volatile Drive driveService;
|
||||
|
||||
|
@ -324,10 +324,12 @@ public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrai
|
|||
if (fileMetadata.getId() == null) {
|
||||
return driveService.files()
|
||||
.create(fileMetadata, mediaContent)
|
||||
.setSupportsAllDrives(true)
|
||||
.setFields("id, name, createdTime, mimeType, size");
|
||||
} else {
|
||||
return driveService.files()
|
||||
.update(fileMetadata.getId(), new File(), mediaContent)
|
||||
.setSupportsAllDrives(true)
|
||||
.setFields("id, name, createdTime, mimeType, size");
|
||||
}
|
||||
}
|
||||
|
@ -347,7 +349,7 @@ public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrai
|
|||
return fileMetadata;
|
||||
} else {
|
||||
throw new ProcessException(format("Upload of File [%s] to Folder [%s] failed, HTTP error code: [%d]",
|
||||
fileMetadata.getName(), fileMetadata.getId(), response.getStatusCode()));
|
||||
fileMetadata.getName(), fileMetadata.getParents().stream().findFirst().orElse(""), response.getStatusCode()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -368,6 +370,8 @@ public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrai
|
|||
private Optional<File> checkFileExistence(String fileName, String parentId) throws IOException {
|
||||
final FileList result = driveService.files()
|
||||
.list()
|
||||
.setSupportsAllDrives(true)
|
||||
.setIncludeItemsFromAllDrives(true)
|
||||
.setQ(format("name='%s' and ('%s' in parents)", fileName, parentId))
|
||||
.setFields("files(name, id)")
|
||||
.execute();
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.google.api.client.json.gson.GsonFactory;
|
|||
import com.google.api.services.drive.Drive;
|
||||
import com.google.api.services.drive.DriveScopes;
|
||||
import com.google.api.services.drive.model.File;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
|
||||
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
|
||||
|
@ -45,6 +46,7 @@ import java.util.Arrays;
|
|||
* SHARED_FOLDER_ID - The ID of a Folder that is shared with the Service Account. The test will create files and sub-folders within this folder.<br />
|
||||
* <br />
|
||||
* Created files and folders are cleaned up, but it's advisable to dedicate a folder for this test so that it can be cleaned up easily should the test fail to do so.
|
||||
* In case your shared folder is located on a shared drive, give Service Account "Manager" permission on the shared drive to make it capable of deleting the created test files.
|
||||
* <br /><br />
|
||||
* WARNING: The creation of a file is not a synchronized operation, may need to adjust tests accordingly!
|
||||
*/
|
||||
|
@ -52,6 +54,8 @@ public abstract class AbstractGoogleDriveIT<T extends GoogleDriveTrait & Process
|
|||
protected static final String SHARED_FOLDER_ID = "";
|
||||
protected static final String DEFAULT_FILE_CONTENT = "test_content";
|
||||
|
||||
protected static final String LARGE_FILE_CONTENT = StringUtils.repeat("a", 355 * 1024);
|
||||
|
||||
private static final String CREDENTIAL_JSON_FILE_PATH = "";
|
||||
private static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance();
|
||||
|
||||
|
@ -91,6 +95,7 @@ public abstract class AbstractGoogleDriveIT<T extends GoogleDriveTrait & Process
|
|||
if (driveService != null) {
|
||||
driveService.files()
|
||||
.delete(mainFolderId)
|
||||
.setSupportsAllDrives(true)
|
||||
.execute();
|
||||
}
|
||||
}
|
||||
|
@ -120,6 +125,7 @@ public abstract class AbstractGoogleDriveIT<T extends GoogleDriveTrait & Process
|
|||
|
||||
Drive.Files.Create create = driveService.files()
|
||||
.create(fileMetaData)
|
||||
.setSupportsAllDrives(true)
|
||||
.setFields("id");
|
||||
|
||||
File file = create.execute();
|
||||
|
@ -140,6 +146,7 @@ public abstract class AbstractGoogleDriveIT<T extends GoogleDriveTrait & Process
|
|||
|
||||
Drive.Files.Create create = driveService.files()
|
||||
.create(fileMetadata, content)
|
||||
.setSupportsAllDrives(true)
|
||||
.setFields("id, name, modifiedTime, createdTime");
|
||||
|
||||
File file = create.execute();
|
||||
|
|
|
@ -98,10 +98,12 @@ public class FetchGoogleDriveTest extends AbstractGoogleDriveTest {
|
|||
private void mockFileDownload(String fileId) throws IOException {
|
||||
when(mockDriverService.files()
|
||||
.get(fileId)
|
||||
.setSupportsAllDrives(true)
|
||||
.executeMediaAsInputStream()).thenReturn(new ByteArrayInputStream(CONTENT.getBytes(UTF_8)));
|
||||
|
||||
when(mockDriverService.files()
|
||||
.get(fileId)
|
||||
.setSupportsAllDrives(true)
|
||||
.setFields("id, name, createdTime, mimeType, size")
|
||||
.execute()).thenReturn(createFile());
|
||||
}
|
||||
|
@ -109,6 +111,7 @@ public class FetchGoogleDriveTest extends AbstractGoogleDriveTest {
|
|||
private void mockFileDownloadError(String fileId, Exception exception) throws IOException {
|
||||
when(mockDriverService.files()
|
||||
.get(fileId)
|
||||
.setSupportsAllDrives(true)
|
||||
.executeMediaAsInputStream())
|
||||
.thenThrow(exception);
|
||||
}
|
||||
|
|
|
@ -26,10 +26,11 @@ import org.junit.jupiter.api.BeforeEach;
|
|||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.apache.nifi.util.EqualsWrapper.wrapList;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
|
||||
|
@ -73,19 +74,21 @@ public class ListGoogleDriveSimpleTest {
|
|||
|
||||
String id = "id_1";
|
||||
String filename = "file_name_1";
|
||||
Long size = 125L;
|
||||
long size = 125L;
|
||||
long createdTime = 123456L;
|
||||
long modifiedTime = 234567L;
|
||||
String mimeType = "mime_type_1";
|
||||
|
||||
when(mockDriverService.files()
|
||||
.list()
|
||||
.setSupportsAllDrives(true)
|
||||
.setIncludeItemsFromAllDrives(true)
|
||||
.setQ("('null' in parents) and (mimeType != 'application/vnd.google-apps.folder') and (mimeType != 'application/vnd.google-apps.shortcut') and trashed = false")
|
||||
.setPageToken(null)
|
||||
.setFields("nextPageToken, files(id, name, size, createdTime, modifiedTime, mimeType)")
|
||||
.execute()
|
||||
.getFiles()
|
||||
).thenReturn(Arrays.asList(
|
||||
).thenReturn(singletonList(
|
||||
createFile(
|
||||
id,
|
||||
filename,
|
||||
|
@ -96,7 +99,7 @@ public class ListGoogleDriveSimpleTest {
|
|||
)
|
||||
));
|
||||
|
||||
List<GoogleDriveFileInfo> expected = Arrays.asList(
|
||||
List<GoogleDriveFileInfo> expected = singletonList(
|
||||
new GoogleDriveFileInfo.Builder()
|
||||
.id(id)
|
||||
.fileName(filename)
|
||||
|
@ -111,7 +114,7 @@ public class ListGoogleDriveSimpleTest {
|
|||
List<GoogleDriveFileInfo> actual = testSubject.performListing(mockProcessContext, minTimestamp, null);
|
||||
|
||||
// THEN
|
||||
List<Function<GoogleDriveFileInfo, Object>> propertyProviders = Arrays.asList(
|
||||
List<Function<GoogleDriveFileInfo, Object>> propertyProviders = asList(
|
||||
GoogleDriveFileInfo::getId,
|
||||
GoogleDriveFileInfo::getIdentifier,
|
||||
GoogleDriveFileInfo::getName,
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
package org.apache.nifi.processors.gcp.drive;
|
||||
|
||||
import static java.lang.String.valueOf;
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -50,7 +50,7 @@ public class ListGoogleDriveTestRunnerTest implements OutputChecker {
|
|||
|
||||
private Drive mockDriverService;
|
||||
|
||||
private String folderId = "folderId";
|
||||
private final String folderId = "folderId";
|
||||
|
||||
@BeforeEach
|
||||
void setUp() throws Exception {
|
||||
|
@ -118,7 +118,7 @@ public class ListGoogleDriveTestRunnerTest implements OutputChecker {
|
|||
|
||||
mockFetchedGoogleDriveFileList(id, filename, size, createdTime, modifiedTime, mimeType);
|
||||
|
||||
List<String> expectedContents = asList(
|
||||
List<String> expectedContents = singletonList(
|
||||
"[" +
|
||||
"{" +
|
||||
"\"drive.id\":\"" + id + "\"," +
|
||||
|
@ -143,12 +143,14 @@ public class ListGoogleDriveTestRunnerTest implements OutputChecker {
|
|||
private void mockFetchedGoogleDriveFileList(String id, String filename, Long size, Long createdTime, Long modifiedTime, String mimeType) throws IOException {
|
||||
when(mockDriverService.files()
|
||||
.list()
|
||||
.setSupportsAllDrives(true)
|
||||
.setIncludeItemsFromAllDrives(true)
|
||||
.setQ("('" + folderId + "' in parents) and (mimeType != 'application/vnd.google-apps.folder') and (mimeType != 'application/vnd.google-apps.shortcut') and trashed = false")
|
||||
.setPageToken(null)
|
||||
.setFields("nextPageToken, files(id, name, size, createdTime, modifiedTime, mimeType)")
|
||||
.execute()
|
||||
.getFiles()
|
||||
).thenReturn(asList(
|
||||
).thenReturn(singletonList(
|
||||
createFile(
|
||||
id,
|
||||
filename,
|
||||
|
@ -170,7 +172,7 @@ public class ListGoogleDriveTestRunnerTest implements OutputChecker {
|
|||
inputFlowFileAttributes.put(GoogleDriveAttributes.TIMESTAMP, valueOf(expectedTimestamp));
|
||||
inputFlowFileAttributes.put(GoogleDriveAttributes.MIME_TYPE, mimeType);
|
||||
|
||||
HashSet<Map<String, String>> expectedAttributes = new HashSet<>(asList(inputFlowFileAttributes));
|
||||
HashSet<Map<String, String>> expectedAttributes = new HashSet<>(singletonList(inputFlowFileAttributes));
|
||||
|
||||
testRunner.run();
|
||||
|
||||
|
|
|
@ -48,14 +48,11 @@ public class PutGoogleDriveIT extends AbstractGoogleDriveIT<PutGoogleDrive> impl
|
|||
|
||||
@Test
|
||||
void testUploadFileToFolderById() {
|
||||
// GIVEN
|
||||
testRunner.setProperty(FOLDER_ID, mainFolderId);
|
||||
testRunner.setProperty(FILE_NAME, TEST_FILENAME);
|
||||
|
||||
// WHEN
|
||||
runWithFileContent();
|
||||
|
||||
// THEN
|
||||
testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1);
|
||||
testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0);
|
||||
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutGoogleDrive.REL_SUCCESS);
|
||||
|
@ -65,22 +62,17 @@ public class PutGoogleDriveIT extends AbstractGoogleDriveIT<PutGoogleDrive> impl
|
|||
|
||||
@Test
|
||||
void testUploadedFileAlreadyExistsFailResolution() {
|
||||
// GIVEN
|
||||
testRunner.setProperty(FOLDER_ID, mainFolderId);
|
||||
testRunner.setProperty(FILE_NAME, TEST_FILENAME);
|
||||
|
||||
// WHEN
|
||||
runWithFileContent();
|
||||
|
||||
// THEN
|
||||
testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1);
|
||||
testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0);
|
||||
testRunner.clearTransferState();
|
||||
|
||||
// WHEN
|
||||
runWithFileContent();
|
||||
|
||||
// THEN
|
||||
testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 0);
|
||||
testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 1);
|
||||
|
||||
|
@ -88,23 +80,18 @@ public class PutGoogleDriveIT extends AbstractGoogleDriveIT<PutGoogleDrive> impl
|
|||
|
||||
@Test
|
||||
void testUploadedFileAlreadyExistsReplaceResolution() {
|
||||
// GIVEN
|
||||
testRunner.setProperty(FOLDER_ID, mainFolderId);
|
||||
testRunner.setProperty(FILE_NAME, TEST_FILENAME);
|
||||
testRunner.setProperty(PutGoogleDrive.CONFLICT_RESOLUTION, REPLACE.getValue());
|
||||
|
||||
// WHEN
|
||||
runWithFileContent();
|
||||
|
||||
// THEN
|
||||
testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1);
|
||||
testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0);
|
||||
testRunner.clearTransferState();
|
||||
|
||||
// WHEN
|
||||
runWithFileContent("012345678");
|
||||
|
||||
// THEN
|
||||
testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1);
|
||||
testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0);
|
||||
|
||||
|
@ -115,27 +102,36 @@ public class PutGoogleDriveIT extends AbstractGoogleDriveIT<PutGoogleDrive> impl
|
|||
|
||||
@Test
|
||||
void testUploadedFileAlreadyExistsIgnoreResolution() {
|
||||
// GIVEN
|
||||
testRunner.setProperty(FOLDER_ID, mainFolderId);
|
||||
testRunner.setProperty(FILE_NAME, TEST_FILENAME);
|
||||
testRunner.setProperty(PutGoogleDrive.CONFLICT_RESOLUTION, IGNORE.getValue());
|
||||
|
||||
// WHEN
|
||||
runWithFileContent();
|
||||
|
||||
// THEN
|
||||
testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1);
|
||||
testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0);
|
||||
testRunner.clearTransferState();
|
||||
|
||||
// WHEN
|
||||
runWithFileContent();
|
||||
|
||||
// THEN
|
||||
testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1);
|
||||
testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testChunkedUpload() {
|
||||
testRunner.setProperty(FOLDER_ID, mainFolderId);
|
||||
testRunner.setProperty(FILE_NAME, TEST_FILENAME);
|
||||
testRunner.setProperty(PutGoogleDrive.CHUNKED_UPLOAD_SIZE, "256 KB");
|
||||
testRunner.setProperty(PutGoogleDrive.CHUNKED_UPLOAD_THRESHOLD, "300 KB");
|
||||
|
||||
runWithFileContent(LARGE_FILE_CONTENT);
|
||||
|
||||
testRunner.assertTransferCount(PutGoogleDrive.REL_SUCCESS, 1);
|
||||
testRunner.assertTransferCount(PutGoogleDrive.REL_FAILURE, 0);
|
||||
}
|
||||
|
||||
|
||||
private void runWithFileContent() {
|
||||
runWithFileContent(DEFAULT_FILE_CONTENT);
|
||||
}
|
||||
|
|
|
@ -192,6 +192,7 @@ public class PutGoogleDriveTest extends AbstractGoogleDriveTest{
|
|||
private void mockFileUpload(File uploadedFile) throws IOException {
|
||||
when(mockDriverService.files()
|
||||
.create(any(File.class), any(InputStreamContent.class))
|
||||
.setSupportsAllDrives(true)
|
||||
.setFields("id, name, createdTime, mimeType, size")
|
||||
.execute())
|
||||
.thenReturn(uploadedFile);
|
||||
|
@ -200,6 +201,7 @@ public class PutGoogleDriveTest extends AbstractGoogleDriveTest{
|
|||
private void mockFileUpdate(File uploadedFile) throws IOException {
|
||||
when(mockDriverService.files()
|
||||
.update(eq(uploadedFile.getId()), any(File.class), any(InputStreamContent.class))
|
||||
.setSupportsAllDrives(true)
|
||||
.setFields("id, name, createdTime, mimeType, size")
|
||||
.execute())
|
||||
.thenReturn(uploadedFile);
|
||||
|
@ -207,13 +209,16 @@ public class PutGoogleDriveTest extends AbstractGoogleDriveTest{
|
|||
|
||||
private void mockFileUploadError(Exception exception) throws IOException {
|
||||
when(mockDriverService.files()
|
||||
.create(any(File.class), any(InputStreamContent.class)))
|
||||
.create(any(File.class), any(InputStreamContent.class))
|
||||
.setSupportsAllDrives(true))
|
||||
.thenThrow(exception);
|
||||
}
|
||||
|
||||
private void mockFileExists(List<File> fileList) throws IOException {
|
||||
when(mockDriverService.files()
|
||||
.list()
|
||||
.setSupportsAllDrives(true)
|
||||
.setIncludeItemsFromAllDrives(true)
|
||||
.setQ(format("name='%s' and ('%s' in parents)", TEST_FILENAME, SHARED_FOLDER_ID))
|
||||
.setFields("files(name, id)")
|
||||
.execute())
|
||||
|
|
Loading…
Reference in New Issue