NIFI-10223 Created ListGoogleDrive processor.

This closes #6200.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Tamas Palfy 2022-07-12 22:28:41 +02:00 committed by Peter Turcsanyi
parent ccf3866261
commit ad781170e3
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
9 changed files with 877 additions and 8 deletions

View File

@ -923,6 +923,11 @@ The following binary components are provided under the Apache Software License v
Google Cloud Client Library for Java
Copyright 2016-2021 Google Inc. All Rights Reserved.
(ASLv2) Google Java API Client Services - Drive API
The following NOTICE information applies:
Google Java API Client Services - Drive API
Copyright 2011-2022 Google Inc. All Rights Reserved.
(ASLv2) Guava
The following NOTICE information applies:
Guava

View File

@ -15,6 +15,11 @@ The following binary components are provided under the Apache Software License v
Google Cloud Client Library for Java
Copyright 2016 Google Inc. All Rights Reserved.
(ASLv2) Google Java API Client Services - Drive API
The following NOTICE information applies:
Google Java API Client Services - Drive API
Copyright 2011-2022 Google Inc. All Rights Reserved.
(ASLv2) The Netty Project
The following NOTICE information applies:
The Netty Project

View File

@ -114,6 +114,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-drive</artifactId>
<version>v3-rev20220709-1.32.1</version>
</dependency>
<dependency>
<groupId>com.tdunning</groupId>
<artifactId>json</artifactId>

View File

@ -31,6 +31,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.util.GoogleUtils;
import org.apache.nifi.proxy.ProxyConfiguration;
import java.net.Proxy;
@ -106,17 +107,14 @@ public abstract class AbstractGCPProcessor<
.build();
/**
* Links to the {@link GCPCredentialsService} which provides credentials for this particular processor.
* Deprecated - Use {@link GoogleUtils#GCP_CREDENTIALS_PROVIDER_SERVICE} instead
*/
@Deprecated
public static final PropertyDescriptor GCP_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
.name("gcp-credentials-provider-service")
.name("GCP Credentials Provider Service")
.description("The Controller Service used to obtain Google Cloud Platform credentials.")
.required(true)
.identifiesControllerService(GCPCredentialsService.class)
.fromPropertyDescriptor(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE)
.name("GCP Credentials Provider Service") // For backward compatibility
.build();
protected volatile CloudService cloudService;
protected CloudService getCloudService() {

View File

@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.drive;
import org.apache.nifi.processor.util.list.ListableEntity;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class GoogleDriveFileInfo implements ListableEntity {
private static final RecordSchema SCHEMA;
private static final String ID = "id";
private static final String FILENAME = "filename";
private static final String SIZE = "size";
private static final String CREATED_TIME = "createdTime";
private static final String MODIFIED_TIME = "modifiedTime";
private static final String MIME_TYPE = "mimeType";
static {
final List<RecordField> recordFields = new ArrayList<>();
recordFields.add(new RecordField(ID, RecordFieldType.STRING.getDataType(), false));
recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false));
recordFields.add(new RecordField(SIZE, RecordFieldType.LONG.getDataType(), false));
recordFields.add(new RecordField(CREATED_TIME, RecordFieldType.TIMESTAMP.getDataType(), false));
recordFields.add(new RecordField(MODIFIED_TIME, RecordFieldType.TIMESTAMP.getDataType(), false));
recordFields.add(new RecordField(MIME_TYPE, RecordFieldType.STRING.getDataType()));
SCHEMA = new SimpleRecordSchema(recordFields);
}
private final String id;
private final String fileName;
private final long size;
private final long createdTime;
private final long modifiedTime;
private final String mimeType;
public String getId() {
return id;
}
public String getFileName() {
return fileName;
}
public long getCreatedTime() {
return createdTime;
}
public long getModifiedTime() {
return modifiedTime;
}
public String getMimeType() {
return mimeType;
}
@Override
public Record toRecord() {
final Map<String, Object> values = new HashMap<>();
values.put(ID, getId());
values.put(FILENAME, getName());
values.put(SIZE, getSize());
values.put(CREATED_TIME, getCreatedTime());
values.put(MODIFIED_TIME, getModifiedTime());
values.put(MIME_TYPE, getMimeType());
return new MapRecord(SCHEMA, values);
}
public static RecordSchema getRecordSchema() {
return SCHEMA;
}
public static final class Builder {
private String id;
private String fileName;
private long size;
private long createdTime;
private long modifiedTime;
private String mimeType;
public Builder id(String id) {
this.id = id;
return this;
}
public Builder fileName(String fileName) {
this.fileName = fileName;
return this;
}
public Builder size(long size) {
this.size = size;
return this;
}
public Builder createdTime(long createdTime) {
this.createdTime = createdTime;
return this;
}
public Builder modifiedTime(long modifiedTime) {
this.modifiedTime = modifiedTime;
return this;
}
public Builder mimeType(String mimeType) {
this.mimeType = mimeType;
return this;
}
public GoogleDriveFileInfo build() {
return new GoogleDriveFileInfo(this);
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((id == null) ? 0 : id.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
GoogleDriveFileInfo other = (GoogleDriveFileInfo) obj;
if (id == null) {
if (other.id != null) {
return false;
}
} else if (!id.equals(other.id)) {
return false;
}
return true;
}
private GoogleDriveFileInfo(final Builder builder) {
this.id = builder.id;
this.fileName = builder.fileName;
this.size = builder.size;
this.createdTime = builder.createdTime;
this.modifiedTime = builder.modifiedTime;
this.mimeType = builder.mimeType;
}
@Override
public String getName() {
return getFileName();
}
@Override
public String getIdentifier() {
return getId();
}
@Override
public long getTimestamp() {
long timestamp = Math.max(getCreatedTime(), getModifiedTime());
return timestamp;
}
@Override
public long getSize() {
return size;
}
}

View File

@ -0,0 +1,364 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.drive;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.gson.GsonFactory;
import com.google.api.services.drive.Drive;
import com.google.api.services.drive.DriveScopes;
import com.google.api.services.drive.model.File;
import com.google.api.services.drive.model.FileList;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processor.util.list.ListedEntityTracker;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.processors.gcp.util.GoogleUtils;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@PrimaryNodeOnly
@TriggerSerially
@Tags({"google", "drive", "storage"})
@CapabilityDescription("Lists concrete files (shortcuts are ignored) in a Google Drive folder. " +
"Each listed file may result in one flowfile, the metadata being written as flowfile attributes. " +
"Or - in case the 'Record Writer' property is set - the entire result is written as records to a single flowfile. " +
"This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the " +
"previous node left off without duplicating all of the data.")
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@WritesAttributes({@WritesAttribute(attribute = "drive.id", description = "The id of the file"),
@WritesAttribute(attribute = "filename", description = "The name of the file"),
@WritesAttribute(attribute = "drive.size", description = "The size of the file"),
@WritesAttribute(attribute = "drive.timestamp", description = "The last modified time or created time (whichever is greater) of the file." +
" The reason for this is that the original modified date of a file is preserved when uploaded to Google Drive." +
" 'Created time' takes the time when the upload occurs. However uploaded files can still be modified later."),
@WritesAttribute(attribute = "mime.type", description = "MimeType of the file")})
@Stateful(scopes = {Scope.CLUSTER}, description = "The processor stores necessary data to be able to keep track what files have been listed already." +
" What exactly needs to be stored depends on the 'Listing Strategy'." +
" State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up" +
" where the previous node left off, without duplicating the data.")
public class ListGoogleDrive extends AbstractListProcessor<GoogleDriveFileInfo> {
private static final String APPLICATION_NAME = "NiFi";
private static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance();
private volatile HttpTransport httpTransport;
public static final PropertyDescriptor FOLDER_ID = new PropertyDescriptor.Builder()
.name("folder-id")
.displayName("Folder ID")
.description("The ID of the folder from which to pull list of files. Needs to be shared with a Service Account." +
" WARNING: Unauthorized access to the folder is treated as if the folder was empty." +
" This results in the processor not creating result flowfiles. No additional error message is provided.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
.build();
public static final PropertyDescriptor RECURSIVE_SEARCH = new PropertyDescriptor.Builder()
.name("recursive-search")
.displayName("Search Recursively")
.description("When 'true', will include list of files from concrete sub-folders (ignores shortcuts)." +
" Otherwise, will return only files that have the defined 'Folder ID' as their parent directly." +
" WARNING: The listing may fail if there are too many sub-folders (500+).")
.required(true)
.defaultValue("true")
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
.name("min-age")
.displayName("Minimum File Age")
.description("The minimum age a file must be in order to be considered; any files younger than this will be ignored.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("0 sec")
.build();
public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AbstractListProcessor.LISTING_STRATEGY)
.allowableValues(BY_TIMESTAMPS, BY_ENTITIES, BY_TIME_WINDOW, NO_TRACKING)
.build();
public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(ListedEntityTracker.TRACKING_STATE_CACHE)
.dependsOn(LISTING_STRATEGY, BY_ENTITIES)
.build();
public static final PropertyDescriptor TRACKING_TIME_WINDOW = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(ListedEntityTracker.TRACKING_TIME_WINDOW)
.dependsOn(LISTING_STRATEGY, BY_ENTITIES)
.build();
public static final PropertyDescriptor INITIAL_LISTING_TARGET = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(ListedEntityTracker.INITIAL_LISTING_TARGET)
.dependsOn(LISTING_STRATEGY, BY_ENTITIES)
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE,
FOLDER_ID,
RECURSIVE_SEARCH,
MIN_AGE,
LISTING_STRATEGY,
TRACKING_STATE_CACHE,
TRACKING_TIME_WINDOW,
INITIAL_LISTING_TARGET,
RECORD_WRITER,
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS)
));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
}
@Override
protected Map<String, String> createAttributes(
final GoogleDriveFileInfo entity,
final ProcessContext context
) {
final Map<String, String> attributes = new HashMap<>();
attributes.put("drive.id", entity.getId());
attributes.put("filename", entity.getName());
attributes.put("drive.size", String.valueOf(entity.getSize()));
attributes.put("drive.timestamp", String.valueOf(entity.getTimestamp()));
attributes.put("mime.type", entity.getMimeType());
return attributes;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context);
httpTransport = new ProxyAwareTransportFactory(proxyConfiguration).create();
}
@Override
protected String getListingContainerName(final ProcessContext context) {
return String.format("Google Drive Folder [%s]", getPath(context));
}
@Override
protected String getPath(final ProcessContext context) {
return context.getProperty(FOLDER_ID).evaluateAttributeExpressions().getValue();
}
@Override
protected boolean isListingResetNecessary(final PropertyDescriptor property) {
return LISTING_STRATEGY.equals(property)
|| FOLDER_ID.equals(property)
|| RECURSIVE_SEARCH.equals(property);
}
@Override
protected Scope getStateScope(final PropertyContext context) {
return Scope.CLUSTER;
}
@Override
protected RecordSchema getRecordSchema() {
return GoogleDriveFileInfo.getRecordSchema();
}
@Override
protected String getDefaultTimePrecision() {
return PRECISION_SECONDS.getValue();
}
@Override
protected List<GoogleDriveFileInfo> performListing(
final ProcessContext context,
final Long minTimestamp,
final ListingMode listingMode
) throws IOException {
final List<GoogleDriveFileInfo> listing = new ArrayList<>();
final String folderId = context.getProperty(FOLDER_ID).evaluateAttributeExpressions().getValue();
final Boolean recursive = context.getProperty(RECURSIVE_SEARCH).asBoolean();
final Long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
Drive driveService = new Drive.Builder(
this.httpTransport,
JSON_FACTORY,
getHttpCredentialsAdapter(
context,
Arrays.asList(DriveScopes.DRIVE_METADATA_READONLY)
)
)
.setApplicationName(APPLICATION_NAME)
.build();
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append(buildQueryForDirs(driveService, folderId, recursive));
queryBuilder.append(" and (mimeType != 'application/vnd.google-apps.folder')");
queryBuilder.append(" and (mimeType != 'application/vnd.google-apps.shortcut')");
queryBuilder.append(" and trashed = false");
if (minTimestamp != null && minTimestamp > 0) {
String formattedMinTimestamp = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(OffsetDateTime.ofInstant(Instant.ofEpochMilli(minTimestamp), ZoneOffset.UTC));
queryBuilder.append(" and (");
queryBuilder.append("modifiedTime >= '" + formattedMinTimestamp + "'");
queryBuilder.append(" or createdTime >= '" + formattedMinTimestamp + "'");
queryBuilder.append(")");
}
if (minAge != null && minAge > 0) {
long maxTimestamp = System.currentTimeMillis() - minAge;
String formattedMaxTimestamp = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(OffsetDateTime.ofInstant(Instant.ofEpochMilli(maxTimestamp), ZoneOffset.UTC));
queryBuilder.append(" and modifiedTime < '" + formattedMaxTimestamp + "'");
queryBuilder.append(" and createdTime < '" + formattedMaxTimestamp + "'");
}
String pageToken = null;
do {
FileList result = driveService.files()
.list()
.setQ(queryBuilder.toString())
.setPageToken(pageToken)
.setFields("nextPageToken, files(id, name, size, createdTime, modifiedTime, mimeType)")
.execute();
for (File file : result.getFiles()) {
GoogleDriveFileInfo.Builder builder = new GoogleDriveFileInfo.Builder()
.id(file.getId())
.fileName(file.getName())
.size(file.getSize())
.createdTime(file.getCreatedTime().getValue())
.modifiedTime(file.getModifiedTime().getValue())
.mimeType(file.getMimeType());
listing.add(builder.build());
}
pageToken = result.getNextPageToken();
} while (pageToken != null);
return listing;
}
@Override
protected Integer countUnfilteredListing(final ProcessContext context) throws IOException {
return performListing(context, null, ListingMode.CONFIGURATION_VERIFICATION).size();
}
HttpCredentialsAdapter getHttpCredentialsAdapter(
final ProcessContext context,
final Collection<String> scopes
) {
GoogleCredentials googleCredentials = getGoogleCredentials(context);
HttpCredentialsAdapter httpCredentialsAdapter = new HttpCredentialsAdapter(googleCredentials.createScoped(scopes));
return httpCredentialsAdapter;
}
private GoogleCredentials getGoogleCredentials(final ProcessContext context) {
final GCPCredentialsService gcpCredentialsService = context.getProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE)
.asControllerService(GCPCredentialsService.class);
return gcpCredentialsService.getGoogleCredentials();
}
private static String buildQueryForDirs(
final Drive service,
final String folderId,
boolean recursive
) throws IOException {
StringBuilder queryBuilder = new StringBuilder("('")
.append(folderId)
.append("' in parents");
if (recursive) {
List<File> subDirectoryList = new LinkedList<>();
collectSubDirectories(service, folderId, subDirectoryList);
for (File subDirectory : subDirectoryList) {
queryBuilder.append(" or '")
.append(subDirectory.getId())
.append("' in parents");
}
}
queryBuilder.append(")");
return queryBuilder.toString();
}
private static void collectSubDirectories(
final Drive service,
final String folderId,
final List<File> dirList
) throws IOException {
String pageToken = null;
do {
FileList directoryList = service.files()
.list()
.setQ("'" + folderId + "' in parents "
+ "and mimeType = 'application/vnd.google-apps.folder'"
)
.setPageToken(pageToken)
.execute();
for (File directory : directoryList.getFiles()) {
dirList.add(directory);
collectSubDirectories(service, directory.getId(), dirList);
}
pageToken = directoryList.getNextPageToken();
} while (pageToken != null);
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.util;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
public class GoogleUtils {
/**
* Links to the {@link GCPCredentialsService} which provides credentials for this particular processor.
*/
public static final PropertyDescriptor GCP_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
.name("gcp-credentials-provider-service")
.displayName("GCP Credentials Provider Service")
.description("The Controller Service used to obtain Google Cloud Platform credentials.")
.required(true)
.identifiesControllerService(GCPCredentialsService.class)
.build();
}

View File

@ -21,4 +21,5 @@ org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
org.apache.nifi.processors.gcp.pubsub.lite.PublishGCPubSubLite
org.apache.nifi.processors.gcp.pubsub.lite.ConsumeGCPubSubLite
org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming
org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming
org.apache.nifi.processors.gcp.drive.ListGoogleDrive

View File

@ -0,0 +1,256 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.drive;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.AbstractInputStreamContent;
import com.google.api.client.http.ByteArrayContent;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.gson.GsonFactory;
import com.google.api.services.drive.Drive;
import com.google.api.services.drive.DriveScopes;
import com.google.api.services.drive.model.File;
import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.processors.gcp.util.GoogleUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Set the following constants before running:<br />
* <br />
* CREDENTIAL_JSON_FILE_PATH - A Service Account credentials JSON file.<br />
* SHARED_FOLDER_ID - The ID of a Folder that is shared with the Service Account. The test will create files and sub-folders within this folder.<br />
* <br />
* Created files and folders are cleaned up, but it's advisable to dedicate a folder for this test so that it can be cleaned up easily should the test fail to do so.
* <br /><br />
* WARNING: The creation of a file is not a synchronized operation so tests may fail because the processor may not list all of them.
*/
public class ListGoogleDriveIT {
public static final String CREDENTIAL_JSON_FILE_PATH = "";
public static final String SHARED_FOLDER_ID = "";
public static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance();
private TestRunner testRunner;
private Drive driveService;
private String mainFolderId;
@BeforeEach
public void init() throws Exception {
ListGoogleDrive testSubject = new ListGoogleDrive();
testRunner = TestRunners.newTestRunner(testSubject);
GCPCredentialsControllerService gcpCredentialsControllerService = new GCPCredentialsControllerService();
testRunner.addControllerService("gcp_credentials_provider_service", gcpCredentialsControllerService);
testRunner.setProperty(gcpCredentialsControllerService, CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE, CREDENTIAL_JSON_FILE_PATH);
testRunner.enableControllerService(gcpCredentialsControllerService);
testRunner.setProperty(GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE, "gcp_credentials_provider_service");
NetHttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport();
driveService = new Drive.Builder(
httpTransport,
JSON_FACTORY,
testSubject.getHttpCredentialsAdapter(
testRunner.getProcessContext(),
DriveScopes.all()
)
)
.setApplicationName(this.getClass().getSimpleName())
.build();
File file = createFolder("main", SHARED_FOLDER_ID);
mainFolderId = file.getId();
testRunner.setProperty(ListGoogleDrive.FOLDER_ID, mainFolderId);
}
@AfterEach
void tearDown() throws IOException {
driveService.files()
.delete(mainFolderId)
.execute();
}
@Test
void listFilesFrom3LayerDeepDirectoryTree() throws Exception {
// GIVEN
File main_sub1 = createFolder("main_sub1", mainFolderId);
File main_sub2 = createFolder("main_sub2", mainFolderId);
File main_sub1_sub1 = createFolder("main_sub1_sub1", main_sub1.getId());
createFile("main_file1", mainFolderId);
createFile("main_file2", mainFolderId);
createFile("main_file3", mainFolderId);
createFile("main_sub1_file1", main_sub1.getId());
createFile("main_sub2_file1", main_sub2.getId());
createFile("main_sub2_file2", main_sub2.getId());
createFile("main_sub1_sub1_file1", main_sub1_sub1.getId());
createFile("main_sub1_sub1_file2", main_sub1_sub1.getId());
createFile("main_sub1_sub1_file3", main_sub1_sub1.getId());
Set<String> expectedFileNames = new HashSet<>(Arrays.asList(
"main_file1", "main_file2", "main_file3",
"main_sub1_file1",
"main_sub2_file1", "main_sub2_file2",
"main_sub1_sub1_file1", "main_sub1_sub1_file2", "main_sub1_sub1_file3"
));
// The creation of the files are not (completely) synchronized.
Thread.sleep(2000);
// WHEN
testRunner.run();
// THEN
List<MockFlowFile> successFlowFiles = testRunner.getFlowFilesForRelationship(ListGoogleDrive.REL_SUCCESS);
Set<String> actualFileNames = successFlowFiles.stream()
.map(flowFile -> flowFile.getAttribute("filename"))
.collect(Collectors.toSet());
assertEquals(expectedFileNames, actualFileNames);
// Next, list a sub folder, non-recursively this time. (Changing these properties will clear the Processor state as well
// so all files are eligible for listing again.)
// GIVEN
testRunner.clearTransferState();
expectedFileNames = new HashSet<>(Arrays.asList(
"main_sub1_file1"
));
// WHEN
testRunner.setProperty(ListGoogleDrive.FOLDER_ID, main_sub1.getId());
testRunner.setProperty(ListGoogleDrive.RECURSIVE_SEARCH, "false");
testRunner.run();
// THEN
successFlowFiles = testRunner.getFlowFilesForRelationship(ListGoogleDrive.REL_SUCCESS);
actualFileNames = successFlowFiles.stream()
.map(flowFile -> flowFile.getAttribute("filename"))
.collect(Collectors.toSet());
assertEquals(expectedFileNames, actualFileNames);
}
@Test
void doNotListTooYoungFilesWhenMinAgeIsSet() throws Exception {
// GIVEN
testRunner.setProperty(ListGoogleDrive.MIN_AGE, "15 s");
createFile("main_file", mainFolderId);
// Make sure the file 'arrives' and could be listed
Thread.sleep(5000);
// WHEN
testRunner.run();
// THEN
List<MockFlowFile> successFlowFiles = testRunner.getFlowFilesForRelationship(ListGoogleDrive.REL_SUCCESS);
Set<String> actualFileNames = successFlowFiles.stream()
.map(flowFile -> flowFile.getAttribute("filename"))
.collect(Collectors.toSet());
assertEquals(Collections.emptySet(), actualFileNames);
// Next, wait for another 10+ seconds for MIN_AGE to expire then list again
// GIVEN
Thread.sleep(10000);
Set<String> expectedFileNames = new HashSet<>(Arrays.asList(
"main_file"
));
// WHEN
testRunner.run();
// THEN
successFlowFiles = testRunner.getFlowFilesForRelationship(ListGoogleDrive.REL_SUCCESS);
actualFileNames = successFlowFiles.stream()
.map(flowFile -> flowFile.getAttribute("filename"))
.collect(Collectors.toSet());
assertEquals(expectedFileNames, actualFileNames);
}
private File createFolder(String folderName, String... parentFolderIds) throws IOException {
File fileMetaData = new File();
fileMetaData.setName(folderName);
if (parentFolderIds != null) {
fileMetaData.setParents(Arrays.asList(parentFolderIds));
}
fileMetaData.setMimeType("application/vnd.google-apps.folder");
Drive.Files.Create create = driveService.files()
.create(fileMetaData)
.setFields("id");
File file = create.execute();
return file;
}
private File createFile(String name, String... folderIds) throws IOException {
File fileMetadata = new File();
fileMetadata.setName(name);
fileMetadata.setParents(Arrays.asList(folderIds));
AbstractInputStreamContent content = new ByteArrayContent("text/plain", "test_content".getBytes(StandardCharsets.UTF_8));
Drive.Files.Create create = driveService.files()
.create(fileMetadata, content)
.setFields("id, modifiedTime");
File file = create.execute();
return file;
}
}