NIFI-11911 Updated FetchGoogleDrive to support Export Types

- Export API support includes Google Docs, Presentations, Spreadsheets, Drawings, and AppScripts

This closes #7575

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2023-08-03 18:49:35 -04:00 committed by exceptionfactory
parent f8e3b9ebac
commit 485112e54f
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
1 changed files with 213 additions and 51 deletions

View File

@ -16,6 +16,44 @@
*/
package org.apache.nifi.processors.gcp.drive;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.services.drive.Drive;
import com.google.api.services.drive.DriveScopes;
import com.google.api.services.drive.model.File;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.processors.gcp.util.GoogleUtils;
import org.apache.nifi.proxy.ProxyConfiguration;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
@ -29,40 +67,6 @@ import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DE
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.services.drive.Drive;
import com.google.api.services.drive.DriveScopes;
import com.google.api.services.drive.model.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.processors.gcp.util.GoogleUtils;
import org.apache.nifi.proxy.ProxyConfiguration;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"google", "drive", "storage", "fetch"})
@CapabilityDescription("Fetches files from a Google Drive Folder. Designed to be used in tandem with ListGoogleDrive. " +
@ -80,6 +84,62 @@ import org.apache.nifi.proxy.ProxyConfiguration;
})
public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTrait {
// Google Docs Export Types
private static final AllowableValue EXPORT_MS_WORD = new AllowableValue("application/vnd.openxmlformats-officedocument.wordprocessingml.document", "Microsoft Word");
private static final AllowableValue EXPORT_OPEN_DOCUMENT = new AllowableValue("application/vnd.oasis.opendocument.text", "OpenDocument");
private static final AllowableValue EXPORT_PDF = new AllowableValue("application/pdf", "PDF");
private static final AllowableValue EXPORT_RICH_TEXT = new AllowableValue("application/rtf", "Rich Text");
private static final AllowableValue EXPORT_EPUB = new AllowableValue("application/epub+zip", "EPUB");
// Shared Export Types
private static final AllowableValue EXPORT_HTML_DOC = new AllowableValue("application/zip", "Web Page (HTML)");
private static final AllowableValue EXPORT_PLAIN_TEXT = new AllowableValue("text/plain", "Plain Text");
// Google Spreadsheet Export Types
private static final AllowableValue EXPORT_MS_EXCEL = new AllowableValue("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "Microsoft Excel");
private static final AllowableValue EXPORT_OPEN_SPREADSHEET = new AllowableValue("application/x-vnd.oasis.opendocument.spreadsheet", "OpenDocument Spreadsheet");
private static final AllowableValue EXPORT_PDF_SPREADSHEET = new AllowableValue("application/pdf", "PDF");
private static final AllowableValue EXPORT_CSV = new AllowableValue("text/csv", "CSV (first sheet only)",
"Comma-separated values. Only the first sheet will be exported.");
private static final AllowableValue EXPORT_TSV = new AllowableValue("text/tab-separated-values", "TSV (first sheet only)",
"Tab-separate values. Only the first sheet will be exported.");
private static final AllowableValue EXPORT_HTML_SPREADSHEET = new AllowableValue("text/html", "Web Page (HTML)");
// Google Presentation Export Types
private static final AllowableValue EXPORT_MS_POWERPOINT = new AllowableValue("application/vnd.openxmlformats-officedocument.presentationml.presentation", "Microsoft PowerPoint");
private static final AllowableValue EXPORT_OPEN_PRESENTATION = new AllowableValue("application/vnd.oasis.opendocument.presentation", "OpenDocument Presentation");
private static final AllowableValue EXPORT_PNG = new AllowableValue("image/png", "PNG (first slide only)");
private static final AllowableValue EXPORT_JPEG = new AllowableValue("image/jpeg", "JPEG (first slide only)");
private static final AllowableValue EXPORT_SVG = new AllowableValue("image/svg+xml", "SVG (first slide only)",
"Scalable Vector Graphics. Only the first slide will be exported.");
// Drawings Export Types
private static final AllowableValue EXPORT_PNG_DRAWING = new AllowableValue("image/png", "PNG");
private static final AllowableValue EXPORT_JPEG_DRAWING = new AllowableValue("image/jpeg", "JPEG");
private static final AllowableValue EXPORT_SVG_DRAWING = new AllowableValue("image/svg+xml", "SVG");
private static final Map<String, String> fileExtensions = new HashMap<>();
static {
fileExtensions.put(EXPORT_MS_WORD.getValue(), ".docx");
fileExtensions.put(EXPORT_OPEN_DOCUMENT.getValue(), ".odt");
fileExtensions.put(EXPORT_PDF.getValue(), ".pdf");
fileExtensions.put(EXPORT_RICH_TEXT.getValue(), ".rtf");
fileExtensions.put(EXPORT_EPUB.getValue(), ".epub");
fileExtensions.put(EXPORT_HTML_DOC.getValue(), ".zip");
fileExtensions.put(EXPORT_PLAIN_TEXT.getValue(), ".txt");
fileExtensions.put(EXPORT_MS_EXCEL.getValue(), ".xlsx");
fileExtensions.put(EXPORT_OPEN_SPREADSHEET.getValue(), ".ods");
fileExtensions.put(EXPORT_CSV.getValue(), ".csv");
fileExtensions.put(EXPORT_TSV.getValue(), ".tsv");
fileExtensions.put(EXPORT_MS_POWERPOINT.getValue(), ".pptx");
fileExtensions.put(EXPORT_OPEN_PRESENTATION.getValue(), ".odp");
fileExtensions.put(EXPORT_PNG.getValue(), ".png");
fileExtensions.put(EXPORT_JPEG.getValue(), ".jpg");
fileExtensions.put(EXPORT_SVG.getValue(), ".svg");
fileExtensions.put("application/vnd.google-apps.script+json", ".json");
}
public static final PropertyDescriptor FILE_ID = new PropertyDescriptor
.Builder().name("drive-file-id")
.displayName("File ID")
@ -91,6 +151,53 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor GOOGLE_DOC_EXPORT_TYPE = new PropertyDescriptor.Builder()
.name("Google Doc Export Type")
.description("Google Documents cannot be downloaded directly from Google Drive but instead must be exported to a specified MIME Type. In the event " +
"that the incoming FlowFile's MIME Type indicates that the file is a Google Document, this property specifies the MIME Type to export the document to.")
.required(true)
.allowableValues(
EXPORT_PDF, EXPORT_PLAIN_TEXT, EXPORT_MS_WORD,
EXPORT_OPEN_DOCUMENT, EXPORT_RICH_TEXT, EXPORT_HTML_DOC, EXPORT_EPUB)
.defaultValue(EXPORT_PDF.getValue())
.build();
public static final PropertyDescriptor GOOGLE_SPREADSHEET_EXPORT_TYPE = new PropertyDescriptor.Builder()
.name("Google Spreadsheet Export Type")
.description("Google Spreadsheets cannot be downloaded directly from Google Drive but instead must be exported to a specified MIME Type. In the event " +
"that the incoming FlowFile's MIME Type indicates that the file is a Google Spreadsheet, this property specifies the MIME Type to export the spreadsheet to.")
.required(true)
.allowableValues(
EXPORT_CSV, EXPORT_MS_EXCEL, EXPORT_PDF_SPREADSHEET,
EXPORT_TSV, EXPORT_HTML_SPREADSHEET, EXPORT_OPEN_SPREADSHEET)
.defaultValue(EXPORT_CSV.getValue())
.build();
public static final PropertyDescriptor GOOGLE_PRESENTATION_EXPORT_TYPE = new PropertyDescriptor.Builder()
.name("Google Presentation Export Type")
.description("Google Presentations cannot be downloaded directly from Google Drive but instead must be exported to a specified MIME Type. In the event " +
"that the incoming FlowFile's MIME Type indicates that the file is a Google Presentation, this property specifies the MIME Type to export the presentation to.")
.required(true)
.allowableValues(
EXPORT_PDF, EXPORT_MS_POWERPOINT, EXPORT_PLAIN_TEXT, EXPORT_OPEN_PRESENTATION,
EXPORT_PNG, EXPORT_JPEG, EXPORT_SVG)
.defaultValue(EXPORT_PDF.getValue())
.build();
public static final PropertyDescriptor GOOGLE_DRAWING_EXPORT_TYPE = new PropertyDescriptor.Builder()
.name("Google Drawing Export Type")
.description("Google Drawings cannot be downloaded directly from Google Drive but instead must be exported to a specified MIME Type. In the event " +
"that the incoming FlowFile's MIME Type indicates that the file is a Google Drawing, this property specifies the MIME Type to export the drawing to.")
.required(true)
.allowableValues(
EXPORT_PDF, EXPORT_PNG_DRAWING, EXPORT_JPEG_DRAWING, EXPORT_SVG_DRAWING)
.defaultValue(EXPORT_PDF.getValue())
.build();
public static final Relationship REL_SUCCESS =
new Relationship.Builder()
.name("success")
@ -103,14 +210,18 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE,
FILE_ID,
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS)
GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE,
FILE_ID,
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS),
GOOGLE_DOC_EXPORT_TYPE,
GOOGLE_SPREADSHEET_EXPORT_TYPE,
GOOGLE_PRESENTATION_EXPORT_TYPE,
GOOGLE_DRAWING_EXPORT_TYPE
));
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS,
REL_FAILURE
REL_SUCCESS,
REL_FAILURE
)));
private volatile Drive driveService;
@ -147,11 +258,12 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
final long startNanos = System.nanoTime();
try {
flowFile = fetchFile(fileId, session, flowFile);
final File fileMetadata = fetchFileMetadata(fileId);
final Map<String, String> attributes = createAttributeMap(fileMetadata);
flowFile = session.putAllAttributes(flowFile, attributes);
final Map<String, String> attributeMap = createAttributeMap(fileMetadata);
flowFile = fetchFile(fileId, session, context, flowFile, attributeMap);
flowFile = session.putAllAttributes(flowFile, attributeMap);
final String url = DRIVE_URL + fileMetadata.getId();
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
@ -164,18 +276,68 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
}
}
private FlowFile fetchFile(String fileId, ProcessSession session, FlowFile flowFile) throws IOException {
private String getExportType(final String mimeType, final ProcessContext context) {
if (mimeType == null) {
return null;
}
switch (mimeType) {
case "application/vnd.google-apps.document":
return context.getProperty(GOOGLE_DOC_EXPORT_TYPE).getValue();
case "application/vnd.google-apps.spreadsheet":
return context.getProperty(GOOGLE_SPREADSHEET_EXPORT_TYPE).getValue();
case "application/vnd.google-apps.presentation":
return context.getProperty(GOOGLE_PRESENTATION_EXPORT_TYPE).getValue();
case "application/vnd.google-apps.drawing":
return context.getProperty(GOOGLE_DRAWING_EXPORT_TYPE).getValue();
case "application/vnd.google-apps.script":
return "application/vnd.google-apps.script+json";
default:
return null;
}
}
private FlowFile fetchFile(final String fileId, final ProcessSession session, final ProcessContext context, final FlowFile flowFile, final Map<String, String> attributeMap) throws IOException {
final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
final String exportType = getExportType(mimeType, context);
if (exportType == null) {
return downloadFile(fileId, session, flowFile);
}
return exportFile(fileId, exportType, session, flowFile, attributeMap);
}
private FlowFile downloadFile(final String fileId, final ProcessSession session, final FlowFile flowFile) throws IOException {
try (final InputStream driveFileInputStream = driveService
.files()
.get(fileId)
.setSupportsAllDrives(true)
.executeMediaAsInputStream()) {
.files()
.get(fileId)
.setSupportsAllDrives(true)
.executeMediaAsInputStream()) {
return session.importFrom(driveFileInputStream, flowFile);
}
}
private File fetchFileMetadata(String fileId) throws IOException {
private FlowFile exportFile(final String fileId, final String exportMimeType, final ProcessSession session, final FlowFile flowFile, final Map<String, String> attributeMap) throws IOException {
attributeMap.put(CoreAttributes.MIME_TYPE.key(), exportMimeType);
final String fileExtension = fileExtensions.get(exportMimeType);
if (fileExtension != null) {
attributeMap.put(CoreAttributes.FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()) + fileExtension);
}
try (final InputStream driveFileInputStream = driveService
.files()
.export(fileId, exportMimeType)
.executeMediaAsInputStream()) {
return session.importFrom(driveFileInputStream, flowFile);
}
}
private File fetchFileMetadata(final String fileId) throws IOException {
return driveService
.files()
.get(fileId)
@ -184,7 +346,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
.execute();
}
private void handleErrorResponse(ProcessSession session, String fileId, FlowFile flowFile, GoogleJsonResponseException e) {
private void handleErrorResponse(final ProcessSession session, final String fileId, FlowFile flowFile, final GoogleJsonResponseException e) {
getLogger().error("Fetching File [{}] failed", fileId, e);
flowFile = session.putAttribute(flowFile, ERROR_CODE, "" + e.getStatusCode());
@ -194,7 +356,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
session.transfer(flowFile, REL_FAILURE);
}
private void handleUnexpectedError(ProcessSession session, FlowFile flowFile, String fileId, Exception e) {
private void handleUnexpectedError(final ProcessSession session, FlowFile flowFile, final String fileId, final Exception e) {
getLogger().error("Fetching File [{}] failed", fileId, e);
flowFile = session.putAttribute(flowFile, ERROR_MESSAGE, e.getMessage());