diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index 2063da4b54..0517d7023f 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -65,6 +65,11 @@
1.20.0-SNAPSHOT
provided
+
+ org.apache.nifi
+ nifi-conflict-resolution
+ 1.20.0-SNAPSHOT
+
org.apache.nifi
nifi-mock
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java
index c958cd4489..40ae075de1 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java
@@ -183,7 +183,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
}
private void handleErrorResponse(ProcessSession session, String fileId, FlowFile flowFile, GoogleJsonResponseException e) {
- getLogger().error("Couldn't fetch file with id '{}'", fileId, e);
+ getLogger().error("Fetching File [{}] failed", fileId, e);
flowFile = session.putAttribute(flowFile, ERROR_CODE, "" + e.getStatusCode());
flowFile = session.putAttribute(flowFile, ERROR_MESSAGE, e.getMessage());
@@ -193,7 +193,7 @@ public class FetchGoogleDrive extends AbstractProcessor implements GoogleDriveTr
}
private void handleUnexpectedError(ProcessSession session, FlowFile flowFile, String fileId, Exception e) {
- getLogger().error("Unexpected error while fetching and processing file with id '{}'", fileId, e);
+ getLogger().error("Fetching File [{}] failed", fileId, e);
flowFile = session.putAttribute(flowFile, ERROR_MESSAGE, e.getMessage());
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java
index 0276c3a0eb..751a6397fb 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java
@@ -23,7 +23,8 @@ import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.joining;
import static org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR;
-import static org.apache.nifi.processor.util.StandardValidators.createRegexMatchingValidator;
+import static org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy.FAIL;
+import static org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy.IGNORE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC;
import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE;
@@ -66,7 +67,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
@@ -90,6 +90,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.conflict.resolution.ConflictResolutionStrategy;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.json.JSONObject;
@@ -108,34 +109,19 @@ import org.json.JSONObject;
@WritesAttribute(attribute = ERROR_CODE, description = ERROR_CODE_DESC),
@WritesAttribute(attribute = ERROR_MESSAGE, description = ERROR_MESSAGE_DESC)})
public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrait {
-
- public static final String IGNORE_RESOLUTION = "ignore";
- public static final String REPLACE_RESOLUTION = "replace";
- public static final String FAIL_RESOLUTION = "fail";
public static final int MIN_ALLOWED_CHUNK_SIZE_IN_BYTES = MediaHttpUploader.MINIMUM_CHUNK_SIZE;
public static final int MAX_ALLOWED_CHUNK_SIZE_IN_BYTES = 1024 * 1024 * 1024;
public static final PropertyDescriptor FOLDER_ID = new PropertyDescriptor.Builder()
.name("folder-id")
.displayName("Folder ID")
- .description("The ID of the shared folder. " +
+ .description("The ID of the shared folder." +
" Please see Additional Details to set up access to Google Drive and obtain Folder ID.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
- public static final PropertyDescriptor SUBFOLDER_NAME = new PropertyDescriptor.Builder()
- .name("subfolder-name")
- .displayName("Subfolder Name")
- .description("The name (path) of the subfolder where files are uploaded. The subfolder name is relative to the shared folder specified by 'Folder ID'."
- + " Example: subFolder, subFolder1/subfolder2")
- .addValidator(createRegexMatchingValidator(Pattern.compile("^(?!/).+(? PROPERTIES = Collections.unmodifiableList(asList(
GCP_CREDENTIALS_PROVIDER_SERVICE,
FOLDER_ID,
- SUBFOLDER_NAME,
- CREATE_SUBFOLDER,
FILE_NAME,
CONFLICT_RESOLUTION,
CHUNKED_UPLOAD_THRESHOLD,
@@ -260,15 +231,11 @@ public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrai
return;
}
- String folderId = context.getProperty(FOLDER_ID).evaluateAttributeExpressions(flowFile).getValue();
- final String subfolderName = context.getProperty(SUBFOLDER_NAME).evaluateAttributeExpressions(flowFile).getValue();
- final boolean createFolder = context.getProperty(CREATE_SUBFOLDER).asBoolean();
+ final String folderId = context.getProperty(FOLDER_ID).evaluateAttributeExpressions(flowFile).getValue();
final String filename = context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
try {
- folderId = subfolderName != null ? getOrCreateParentSubfolder(subfolderName, folderId, createFolder).getId() : folderId;
-
final long startNanos = System.nanoTime();
final long size = flowFile.getSize();
@@ -280,20 +247,19 @@ public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrai
.asDataSize(DataUnit.B)
.intValue();
- final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
-
+ final ConflictResolutionStrategy conflictResolution = ConflictResolutionStrategy.forValue(context.getProperty(CONFLICT_RESOLUTION).getValue());
final Optional alreadyExistingFile = checkFileExistence(filename, folderId);
- final File fileMetadata = alreadyExistingFile.isPresent() ? alreadyExistingFile.get() : createMetadata(filename, folderId);
+ final File fileMetadata = alreadyExistingFile.orElseGet(() -> createMetadata(filename, folderId));
- if (alreadyExistingFile.isPresent() && FAIL_RESOLUTION.equals(conflictResolution)) {
- getLogger().error("File '{}' already exists in {} folder, conflict resolution is '{}'", filename, getFolderName(subfolderName), FAIL_RESOLUTION);
+ if (alreadyExistingFile.isPresent() && conflictResolution == FAIL) {
+ getLogger().error("File [{}] already exists in [{}] Folder, conflict resolution is [{}]", filename, folderId, FAIL.getDisplayName());
flowFile = addAttributes(alreadyExistingFile.get(), flowFile, session);
session.transfer(flowFile, REL_FAILURE);
return;
}
- if (alreadyExistingFile.isPresent() && IGNORE_RESOLUTION.equals(conflictResolution)) {
- getLogger().info("File '{}' already exists in {} folder, conflict resolution is '{}'", filename, getFolderName(subfolderName), IGNORE_RESOLUTION);
+ if (alreadyExistingFile.isPresent() && conflictResolution == IGNORE) {
+ getLogger().info("File [{}] already exists in [{}] Folder, conflict resolution is [{}]", filename, folderId, IGNORE.getDisplayName());
flowFile = addAttributes(alreadyExistingFile.get(), flowFile, session);
session.transfer(flowFile, REL_SUCCESS);
return;
@@ -323,12 +289,12 @@ public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrai
}
session.transfer(flowFile, REL_SUCCESS);
} catch (GoogleJsonResponseException e) {
- getLogger().error("Exception occurred while uploading file '{}' to {} Google Drive folder", filename,
- getFolderName(subfolderName), e);
+ getLogger().error("Exception occurred while uploading File [{}] to [{}] Google Drive Folder", filename,
+ folderId, e);
handleExpectedError(session, flowFile, e);
} catch (Exception e) {
- getLogger().error("Exception occurred while uploading file '{}' to {} Google Drive folder", filename,
- getFolderName(subfolderName), e);
+ getLogger().error("Exception occurred while uploading File [{}] to [{}] Google Drive Folder", filename,
+ folderId, e);
if (e.getCause() != null && e.getCause() instanceof GoogleJsonResponseException) {
handleExpectedError(session, flowFile, (GoogleJsonResponseException) e.getCause());
@@ -354,10 +320,6 @@ public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrai
return session.putAllAttributes(flowFile, attributes);
}
- private String getFolderName(String subFolderName) {
- return subFolderName == null ? "shared" : format("'%s'", subFolderName);
- }
-
private DriveRequest createDriveRequest(File fileMetadata, final InputStreamContent mediaContent) throws IOException {
if (fileMetadata.getId() == null) {
return driveService.files()
@@ -384,7 +346,8 @@ public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrai
fileMetadata.setSize(mediaContent.getLength());
return fileMetadata;
} else {
- throw new ProcessException(format("Upload of file '%s' to folder '%s' failed, HTTP error code: %d", fileMetadata.getName(), fileMetadata.getId(), response.getStatusCode()));
+ throw new ProcessException(format("Upload of File [%s] to Folder [%s] failed, HTTP error code: [%d]",
+ fileMetadata.getName(), fileMetadata.getId(), response.getStatusCode()));
}
}
@@ -395,44 +358,6 @@ public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrai
return new JSONObject(contentAsString).getString("id");
}
- private File getOrCreateParentSubfolder(String folderName, String parentFolderId, boolean createFolder) throws IOException {
- final int indexOfPathSeparator = folderName.indexOf("/");
-
- if (isMultiLevelFolder(indexOfPathSeparator, folderName)) {
- final String mainFolderName = folderName.substring(0, indexOfPathSeparator);
- final String subFolders = folderName.substring(indexOfPathSeparator + 1);
- final File mainFolder = getOrCreateFolder(mainFolderName, parentFolderId, createFolder);
- return getOrCreateParentSubfolder(subFolders, mainFolder.getId(), createFolder);
- } else {
- return getOrCreateFolder(folderName, parentFolderId, createFolder);
- }
- }
-
- private boolean isMultiLevelFolder(int indexOfPathSeparator, String folderName) {
- return indexOfPathSeparator > 0 && indexOfPathSeparator < folderName.length() - 1;
- }
-
- private File getOrCreateFolder(String folderName, String parentFolderId, boolean createFolder) throws IOException {
- final Optional existingFolder = checkFolderExistence(folderName, parentFolderId);
-
- if (existingFolder.isPresent()) {
- return existingFolder.get();
- }
-
- if (createFolder) {
- getLogger().debug("Create folder " + folderName + " parent id: " + parentFolderId);
- final File folderMetadata = createMetadata(folderName, parentFolderId);
- folderMetadata.setMimeType(DRIVE_FOLDER_MIME_TYPE);
-
- return driveService.files()
- .create(folderMetadata)
- .setFields("id, parents")
- .execute();
- } else {
- throw new ProcessException(format("The specified subfolder '%s' does not exist and '%s' is false.", folderName, CREATE_SUBFOLDER.getDisplayName()));
- }
- }
-
private File createMetadata(final String name, final String parentId) {
final File metadata = new File();
metadata.setName(name);
@@ -440,18 +365,10 @@ public class PutGoogleDrive extends AbstractProcessor implements GoogleDriveTrai
return metadata;
}
- private Optional checkFolderExistence(String folderName, String parentId) throws IOException {
- return checkObjectExistence(format("mimeType='%s' and name='%s' and ('%s' in parents)", DRIVE_FOLDER_MIME_TYPE, folderName, parentId));
- }
-
private Optional checkFileExistence(String fileName, String parentId) throws IOException {
- return checkObjectExistence(format("name='%s' and ('%s' in parents)", fileName, parentId));
- }
-
- private Optional checkObjectExistence(String query) throws IOException {
final FileList result = driveService.files()
.list()
- .setQ(query)
+ .setQ(format("name='%s' and ('%s' in parents)", fileName, parentId))
.setFields("files(name, id)")
.execute();
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java
index 8b10406fc7..9220b4d02e 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDriveIT.java
@@ -42,7 +42,6 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT
@Test
void testFetch() throws Exception {
- // GIVEN
File file = createFileWithDefaultContent("test_file.txt", mainFolderId);
Map inputFlowFileAttributes = new HashMap<>();
@@ -54,11 +53,9 @@ public class FetchGoogleDriveIT extends AbstractGoogleDriveIT
HashSet