From 1d5a1bff0857542ac8ade18169cf3e4b9a5c3dd2 Mon Sep 17 00:00:00 2001 From: krisztina-zsihovszki Date: Wed, 30 Nov 2022 14:39:51 +0100 Subject: [PATCH] NIFI-10868 PutDropbox processor This closes #6740. Signed-off-by: Peter Turcsanyi --- .../processors/dropbox/DropboxAttributes.java | 40 ++ .../processors/dropbox/DropboxFileInfo.java | 14 +- .../dropbox/DropboxFlowFileAttribute.java | 12 +- .../nifi/processors/dropbox/DropboxTrait.java | 42 +- .../nifi/processors/dropbox/FetchDropbox.java | 81 +++- .../nifi/processors/dropbox/ListDropbox.java | 48 +-- .../nifi/processors/dropbox/PutDropbox.java | 358 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../dropbox/AbstractDropboxTest.java | 128 +++++++ .../processors/dropbox/FetchDropboxTest.java | 88 ++--- .../processors/dropbox/ListDropboxTest.java | 95 +---- .../nifi/processors/dropbox/PutDropboxIT.java | 153 ++++++++ .../processors/dropbox/PutDropboxTest.java | 333 ++++++++++++++++ 13 files changed, 1195 insertions(+), 198 deletions(-) create mode 100644 nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxAttributes.java create mode 100644 nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/PutDropbox.java create mode 100644 nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/AbstractDropboxTest.java create mode 100644 nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxIT.java create mode 100644 nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxTest.java diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxAttributes.java b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxAttributes.java new file mode 100644 index 0000000000..29d9e4410c --- /dev/null +++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxAttributes.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.dropbox; + +public class DropboxAttributes { + public static final String ID = "dropbox.id"; + public static final String ID_DESC = "The Dropbox identifier of the file"; + + public static final String PATH = "path"; + public static final String PATH_DESC = "The folder path where the file is located"; + + public static final String FILENAME = "filename"; + public static final String FILENAME_DESC = "The name of the file"; + + public static final String SIZE = "dropbox.size"; + public static final String SIZE_DESC = "The size of the file"; + + public static final String TIMESTAMP = "dropbox.timestamp"; + public static final String TIMESTAMP_DESC = "The server modified time, when the file was uploaded to Dropbox"; + + public static final String REVISION = "dropbox.revision"; + public static final String REVISION_DESC = "Revision of the file"; + + public static final String ERROR_MESSAGE = "error.message"; + public static final String ERROR_MESSAGE_DESC = "The error message returned by Dropbox when the fetch of a file fails"; +} diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxFileInfo.java b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxFileInfo.java index 22eda85a51..0d1dfc0018 100644 --- a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxFileInfo.java +++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxFileInfo.java @@ -16,6 +16,13 @@ */ package org.apache.nifi.processors.dropbox; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -31,13 +38,6 @@ import org.apache.nifi.serialization.record.RecordSchema; public class DropboxFileInfo implements ListableEntity { - public static final String ID = "dropbox.id"; - public static final String PATH = "path"; - public static final String FILENAME = "filename"; - public static final String SIZE = "dropbox.size"; - public static final String TIMESTAMP = "dropbox.timestamp"; - public static final String REVISION = "dropbox.revision"; - private static final RecordSchema SCHEMA; static { diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxFlowFileAttribute.java b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxFlowFileAttribute.java index 0a5898cb2a..12e7bd4d6e 100644 --- a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxFlowFileAttribute.java +++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxFlowFileAttribute.java @@ -19,12 +19,12 @@ package org.apache.nifi.processors.dropbox; import java.util.function.Function; public enum DropboxFlowFileAttribute { - ID(DropboxFileInfo.ID, DropboxFileInfo::getId), - PATH(DropboxFileInfo.PATH, DropboxFileInfo::getPath), - FILENAME(DropboxFileInfo.FILENAME, DropboxFileInfo::getName), - SIZE(DropboxFileInfo.SIZE, fileInfo -> String.valueOf(fileInfo.getSize())), - TIMESTAMP(DropboxFileInfo.TIMESTAMP, fileInfo -> String.valueOf(fileInfo.getTimestamp())), - REVISION(DropboxFileInfo.REVISION, DropboxFileInfo::getRevision); + ID(DropboxAttributes.ID, DropboxFileInfo::getId), + PATH(DropboxAttributes.PATH, DropboxFileInfo::getPath), + FILENAME(DropboxAttributes.FILENAME, DropboxFileInfo::getName), + SIZE(DropboxAttributes.SIZE, fileInfo -> String.valueOf(fileInfo.getSize())), + TIMESTAMP(DropboxAttributes.TIMESTAMP, fileInfo -> String.valueOf(fileInfo.getTimestamp())), + REVISION(DropboxAttributes.REVISION, DropboxFileInfo::getRevision); private final String name; private final Function fromFileInfo; diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxTrait.java b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxTrait.java index 6e8548787a..32d779c3bf 100644 --- a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxTrait.java +++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/DropboxTrait.java @@ -16,12 +16,18 @@ */ package org.apache.nifi.processors.dropbox; +import static java.lang.String.format; +import static java.lang.String.valueOf; + import com.dropbox.core.DbxRequestConfig; import com.dropbox.core.http.HttpRequestor; import com.dropbox.core.http.OkHttp3Requestor; import com.dropbox.core.oauth.DbxCredential; import com.dropbox.core.v2.DbxClientV2; +import com.dropbox.core.v2.files.FileMetadata; import java.net.Proxy; +import java.util.HashMap; +import java.util.Map; import okhttp3.Credentials; import okhttp3.OkHttpClient; import org.apache.nifi.components.PropertyDescriptor; @@ -32,6 +38,8 @@ import org.apache.nifi.proxy.ProxyConfiguration; public interface DropboxTrait { + String DROPBOX_HOME_URL = "https://www.dropbox.com/home"; + PropertyDescriptor CREDENTIAL_SERVICE = new PropertyDescriptor.Builder() .name("dropbox-credential-service") .displayName("Dropbox Credential Service") @@ -41,9 +49,10 @@ public interface DropboxTrait { .required(true) .build(); - - default DbxClientV2 getDropboxApiClient(ProcessContext context, ProxyConfiguration proxyConfiguration, String clientId) { - OkHttpClient.Builder okHttpClientBuilder = OkHttp3Requestor.defaultOkHttpClientBuilder(); + default DbxClientV2 getDropboxApiClient(ProcessContext context, String identifier) { + final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context); + final String dropboxClientId = format("%s-%s", getClass().getSimpleName(), identifier); + final OkHttpClient.Builder okHttpClientBuilder = OkHttp3Requestor.defaultOkHttpClientBuilder(); if (!Proxy.Type.DIRECT.equals(proxyConfiguration.getProxyType())) { okHttpClientBuilder.proxy(proxyConfiguration.createProxy()); @@ -58,16 +67,37 @@ public interface DropboxTrait { } } - HttpRequestor httpRequestor = new OkHttp3Requestor(okHttpClientBuilder.build()); - DbxRequestConfig config = DbxRequestConfig.newBuilder(clientId) + final HttpRequestor httpRequestor = new OkHttp3Requestor(okHttpClientBuilder.build()); + final DbxRequestConfig config = DbxRequestConfig.newBuilder(dropboxClientId) .withHttpRequestor(httpRequestor) .build(); final DropboxCredentialService credentialService = context.getProperty(CREDENTIAL_SERVICE) .asControllerService(DropboxCredentialService.class); - DropboxCredentialDetails credential = credentialService.getDropboxCredential(); + final DropboxCredentialDetails credential = credentialService.getDropboxCredential(); return new DbxClientV2(config, new DbxCredential(credential.getAccessToken(), -1L, credential.getRefreshToken(), credential.getAppKey(), credential.getAppSecret())); } + + default String convertFolderName(String folderName) { + return "/".equals(folderName) ? "" : folderName; + } + + default String getParentPath(String fullPath) { + final int idx = fullPath.lastIndexOf("/"); + final String parentPath = fullPath.substring(0, idx); + return "".equals(parentPath) ? "/" : parentPath; + } + + default Map createAttributeMap(FileMetadata fileMetadata) { + final Map attributes = new HashMap<>(); + attributes.put(DropboxAttributes.ID, fileMetadata.getId()); + attributes.put(DropboxAttributes.PATH, getParentPath(fileMetadata.getPathDisplay())); + attributes.put(DropboxAttributes.FILENAME, fileMetadata.getName()); + attributes.put(DropboxAttributes.SIZE, valueOf(fileMetadata.getSize())); + attributes.put(DropboxAttributes.REVISION, fileMetadata.getRev()); + attributes.put(DropboxAttributes.TIMESTAMP, valueOf(fileMetadata.getServerModified().getTime())); + return attributes; + } } diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/FetchDropbox.java b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/FetchDropbox.java index 3f8c235b0e..0499f18947 100644 --- a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/FetchDropbox.java +++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/FetchDropbox.java @@ -16,16 +16,33 @@ */ package org.apache.nifi.processors.dropbox; -import static java.lang.String.format; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE_DESC; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME_DESC; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID_DESC; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH_DESC; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION_DESC; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE_DESC; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP_DESC; +import com.dropbox.core.DbxDownloader; import com.dropbox.core.DbxException; import com.dropbox.core.v2.DbxClientV2; +import com.dropbox.core.v2.files.FileMetadata; import java.io.InputStream; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -34,6 +51,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -49,15 +67,18 @@ import org.apache.nifi.proxy.ProxySpec; @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @Tags({"dropbox", "storage", "fetch"}) @CapabilityDescription("Fetches files from Dropbox. Designed to be used in tandem with ListDropbox.") -@WritesAttribute(attribute = "error.message", description = "When a FlowFile is routed to 'failure', this attribute is added indicating why the file could " - + "not be fetched from Dropbox.") -@SeeAlso(ListDropbox.class) -@WritesAttributes( - @WritesAttribute(attribute = FetchDropbox.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by Dropbox when the fetch of a file fails.")) +@SeeAlso({PutDropbox.class, ListDropbox.class}) +@WritesAttributes({ + @WritesAttribute(attribute = ERROR_MESSAGE, description = ERROR_MESSAGE_DESC), + @WritesAttribute(attribute = ID, description = ID_DESC), + @WritesAttribute(attribute = PATH, description = PATH_DESC), + @WritesAttribute(attribute = FILENAME, description = FILENAME_DESC), + @WritesAttribute(attribute = SIZE, description = SIZE_DESC), + @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC), + @WritesAttribute(attribute = REVISION, description = REVISION_DESC)} +) public class FetchDropbox extends AbstractProcessor implements DropboxTrait { - public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message"; - public static final PropertyDescriptor FILE = new PropertyDescriptor .Builder().name("file") .displayName("File") @@ -83,7 +104,7 @@ public class FetchDropbox extends AbstractProcessor implements DropboxTrait { .description("A FlowFile will be routed here for each File for which fetch was attempted but failed.") .build(); - public static final Set relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + public static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( REL_SUCCESS, REL_FAILURE ))); @@ -96,9 +117,11 @@ public class FetchDropbox extends AbstractProcessor implements DropboxTrait { private DbxClientV2 dropboxApiClient; + private DbxDownloader dbxDownloader; + @Override public Set getRelationships() { - return relationships; + return RELATIONSHIPS; } @Override @@ -108,14 +131,19 @@ public class FetchDropbox extends AbstractProcessor implements DropboxTrait { @OnScheduled public void onScheduled(final ProcessContext context) { - final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context); - String dropboxClientId = format("%s-%s", getClass().getSimpleName(), getIdentifier()); - dropboxApiClient = getDropboxApiClient(context, proxyConfiguration, dropboxClientId); + dropboxApiClient = getDropboxApiClient(context, getIdentifier()); + } + + @OnUnscheduled + public void shutdown() { + if (dbxDownloader != null) { + dbxDownloader.close(); + } } @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); + final FlowFile flowFile = session.get(); if (flowFile == null) { return; } @@ -124,24 +152,35 @@ public class FetchDropbox extends AbstractProcessor implements DropboxTrait { fileIdentifier = correctFilePath(fileIdentifier); FlowFile outFlowFile = flowFile; + final long startNanos = System.nanoTime(); try { - fetchFile(fileIdentifier, session, outFlowFile); + FileMetadata fileMetadata = fetchFile(fileIdentifier, session, outFlowFile); + + final Map attributes = createAttributeMap(fileMetadata); + outFlowFile = session.putAllAttributes(outFlowFile, attributes); + String url = DROPBOX_HOME_URL + fileMetadata.getPathDisplay(); + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().fetch(flowFile, url, transferMillis); + session.transfer(outFlowFile, REL_SUCCESS); } catch (Exception e) { handleError(session, flowFile, fileIdentifier, e); } } - private void fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws DbxException { - InputStream dropboxInputStream = dropboxApiClient.files() - .download(fileId) - .getInputStream(); - session.importFrom(dropboxInputStream, outFlowFile); + private FileMetadata fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws DbxException { + try (DbxDownloader downloader = dropboxApiClient.files().download(fileId)) { + dbxDownloader = downloader; + final InputStream dropboxInputStream = downloader.getInputStream(); + session.importFrom(dropboxInputStream, outFlowFile); + return downloader.getResult(); + } } private void handleError(ProcessSession session, FlowFile flowFile, String fileId, Exception e) { getLogger().error("Error while fetching and processing file with id '{}'", fileId, e); - FlowFile outFlowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage()); + final FlowFile outFlowFile = session.putAttribute(flowFile, ERROR_MESSAGE, e.getMessage()); + session.penalize(outFlowFile); session.transfer(outFlowFile, REL_FAILURE); } diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/ListDropbox.java b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/ListDropbox.java index 9f99b3839a..a2c4fc4ffb 100644 --- a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/ListDropbox.java +++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/ListDropbox.java @@ -18,6 +18,18 @@ package org.apache.nifi.processors.dropbox; import static java.lang.String.format; import static java.util.stream.Collectors.toList; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME_DESC; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID_DESC; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH_DESC; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION_DESC; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE_DESC; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP_DESC; import com.dropbox.core.DbxException; import com.dropbox.core.v2.DbxClientV2; @@ -69,17 +81,19 @@ import org.apache.nifi.serialization.record.RecordSchema; " This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the" + " previous node left off without duplicating all of the data.") @InputRequirement(Requirement.INPUT_FORBIDDEN) -@WritesAttributes({@WritesAttribute(attribute = DropboxFileInfo.ID, description = "The Dropbox identifier of the file"), - @WritesAttribute(attribute = DropboxFileInfo.PATH, description = "The folder path where the file is located"), - @WritesAttribute(attribute = DropboxFileInfo.FILENAME, description = "The name of the file"), - @WritesAttribute(attribute = DropboxFileInfo.SIZE, description = "The size of the file"), - @WritesAttribute(attribute = DropboxFileInfo.TIMESTAMP, description = "The server modified time, when the file was uploaded to Dropbox"), - @WritesAttribute(attribute = DropboxFileInfo.REVISION, description = "Revision of the file")}) +@WritesAttributes({ + @WritesAttribute(attribute = ID, description = ID_DESC), + @WritesAttribute(attribute = PATH, description = PATH_DESC), + @WritesAttribute(attribute = FILENAME, description = FILENAME_DESC), + @WritesAttribute(attribute = SIZE, description = SIZE_DESC), + @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC), + @WritesAttribute(attribute = REVISION, description = REVISION_DESC)}) @Stateful(scopes = {Scope.CLUSTER}, description = "The processor stores necessary data to be able to keep track what files have been listed already. " + "What exactly needs to be stored depends on the 'Listing Strategy'.") -@SeeAlso(FetchDropbox.class) +@SeeAlso({FetchDropbox.class, PutDropbox.class}) @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") public class ListDropbox extends AbstractListProcessor implements DropboxTrait { + public static final PropertyDescriptor FOLDER = new PropertyDescriptor.Builder() .name("folder") .displayName("Folder") @@ -148,9 +162,7 @@ public class ListDropbox extends AbstractListProcessor implemen @OnScheduled public void onScheduled(final ProcessContext context) { - final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context); - String dropboxClientId = format("%s-%s", getClass().getSimpleName(), getIdentifier()); - dropboxApiClient = getDropboxApiClient(context, proxyConfiguration, dropboxClientId); + dropboxApiClient = getDropboxApiClient(context, getIdentifier()); } @Override @@ -189,20 +201,20 @@ public class ListDropbox extends AbstractListProcessor implemen try { Predicate metadataFilter = createMetadataFilter(minTimestamp, minAge); - ListFolderBuilder listFolderBuilder = dropboxApiClient.files().listFolderBuilder(convertFolderName(folderName)); + final ListFolderBuilder listFolderBuilder = dropboxApiClient.files().listFolderBuilder(convertFolderName(folderName)); ListFolderResult result = listFolderBuilder .withRecursive(recursive) .start(); - List metadataList = new ArrayList<>(filterMetadata(result, metadataFilter)); + final List metadataList = new ArrayList<>(filterMetadata(result, metadataFilter)); while (result.getHasMore()) { result = dropboxApiClient.files().listFolderContinue(result.getCursor()); metadataList.addAll(filterMetadata(result, metadataFilter)); } - for (FileMetadata metadata : metadataList) { - DropboxFileInfo.Builder builder = new DropboxFileInfo.Builder() + for (final FileMetadata metadata : metadataList) { + final DropboxFileInfo.Builder builder = new DropboxFileInfo.Builder() .id(metadata.getId()) .path(getParentPath(metadata.getPathDisplay())) .name(metadata.getName()) @@ -268,13 +280,5 @@ public class ListDropbox extends AbstractListProcessor implemen .collect(toList()); } - private String getParentPath(String fullPath) { - int idx = fullPath.lastIndexOf("/"); - String parentPath = fullPath.substring(0, idx); - return "".equals(parentPath) ? "/" : parentPath; - } - private String convertFolderName(String folderName) { - return "/".equals(folderName) ? "" : folderName; - } } diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/PutDropbox.java b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/PutDropbox.java new file mode 100644 index 0000000000..124af41d37 --- /dev/null +++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/java/org/apache/nifi/processors/dropbox/PutDropbox.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.dropbox; + +import static java.lang.String.format; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE_DESC; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME_DESC; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID_DESC; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH_DESC; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION_DESC; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE_DESC; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP_DESC; + +import com.dropbox.core.DbxException; +import com.dropbox.core.DbxUploader; +import com.dropbox.core.RateLimitException; +import com.dropbox.core.v2.DbxClientV2; +import com.dropbox.core.v2.files.CommitInfo; +import com.dropbox.core.v2.files.FileMetadata; +import com.dropbox.core.v2.files.UploadErrorException; +import com.dropbox.core.v2.files.UploadSessionAppendV2Uploader; +import com.dropbox.core.v2.files.UploadSessionCursor; +import com.dropbox.core.v2.files.UploadSessionFinishUploader; +import com.dropbox.core.v2.files.UploadSessionStartUploader; +import com.dropbox.core.v2.files.UploadUploader; +import com.dropbox.core.v2.files.WriteMode; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.proxy.ProxySpec; + +/** + * This processor uploads objects to Dropbox. + */ +@SeeAlso({ListDropbox.class, FetchDropbox.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"dropbox", "storage", "put"}) +@CapabilityDescription("Puts content to a Dropbox folder.") +@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the Dropbox object.") +@WritesAttributes({ + @WritesAttribute(attribute = ERROR_MESSAGE, description = ERROR_MESSAGE_DESC), + @WritesAttribute(attribute = ID, description = ID_DESC), + @WritesAttribute(attribute = PATH, description = PATH_DESC), + @WritesAttribute(attribute = FILENAME, description = FILENAME_DESC), + @WritesAttribute(attribute = SIZE, description = SIZE_DESC), + @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC), + @WritesAttribute(attribute = REVISION, description = REVISION_DESC)}) +public class PutDropbox extends AbstractProcessor implements DropboxTrait { + + public static final int SINGLE_UPLOAD_LIMIT_IN_BYTES = 150 * 1024 * 1024; + + public static final String IGNORE_RESOLUTION = "ignore"; + public static final String OVERWRITE_RESOLUTION = "overwrite"; + public static final String FAIL_RESOLUTION = "fail"; + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Files that have been successfully written to Dropbox are transferred to this relationship.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Files that could not be written to Dropbox for some reason are transferred to this relationship.") + .build(); + + public static final PropertyDescriptor FOLDER = new PropertyDescriptor.Builder() + .name("folder") + .displayName("Folder") + .description("The path of the Dropbox folder to upload files to. " + + "The folder will be created if it does not exist yet.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("/.*"))) + .defaultValue("/") + .build(); + + public static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder() + .name("file-name") + .displayName("Filename") + .description("The full name of the file to upload.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue("${filename}") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder() + .name("conflict-resolution-strategy") + .displayName("Conflict Resolution Strategy") + .description("Indicates what should happen when a file with the same name already exists in the specified Dropbox folder.") + .required(true) + .defaultValue(FAIL_RESOLUTION) + .allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION, OVERWRITE_RESOLUTION) + .build(); + + public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new PropertyDescriptor.Builder() + .name("chunked-upload-size") + .displayName("Chunked Upload Size") + .description("Defines the size of a chunk. Used when a FlowFile's size exceeds 'Chunked Upload Threshold' and content is uploaded in smaller chunks. " + + "It is recommended to specify chunked upload size smaller than 'Chunked Upload Threshold' and as multiples of 4 MB. " + + "Maximum allowed value is 150 MB.") + .defaultValue("8 MB") + .addValidator(StandardValidators.createDataSizeBoundsValidator(1, SINGLE_UPLOAD_LIMIT_IN_BYTES)) + .required(false) + .build(); + + public static final PropertyDescriptor CHUNKED_UPLOAD_THRESHOLD = new PropertyDescriptor.Builder() + .name("chunked-upload-threshold") + .displayName("Chunked Upload Threshold") + .description("The maximum size of the content which is uploaded at once. FlowFiles larger than this threshold are uploaded in chunks. " + + "Maximum allowed value is 150 MB.") + .defaultValue("150 MB") + .addValidator(StandardValidators.createDataSizeBoundsValidator(1, SINGLE_UPLOAD_LIMIT_IN_BYTES)) + .required(false) + .build(); + + private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + CREDENTIAL_SERVICE, + FOLDER, + FILE_NAME, + CONFLICT_RESOLUTION, + CHUNKED_UPLOAD_THRESHOLD, + CHUNKED_UPLOAD_SIZE, + ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxySpec.HTTP_AUTH) + )); + + private static final Set RELATIONSHIPS; + + static { + final Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + RELATIONSHIPS = Collections.unmodifiableSet(rels); + } + + private DbxClientV2 dropboxApiClient; + + private DbxUploader dbxUploader; + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + dropboxApiClient = getDropboxApiClient(context, getIdentifier()); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String folder = context.getProperty(FOLDER).evaluateAttributeExpressions(flowFile).getValue(); + final String filename = context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + + final long chunkUploadThreshold = context.getProperty(CHUNKED_UPLOAD_THRESHOLD) + .asDataSize(DataUnit.B) + .longValue(); + + final long uploadChunkSize = context.getProperty(CHUNKED_UPLOAD_SIZE) + .asDataSize(DataUnit.B) + .longValue(); + + final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue(); + + final long size = flowFile.getSize(); + final String uploadPath = convertFolderName(folder) + "/" + filename; + final long startNanos = System.nanoTime(); + FileMetadata fileMetadata = null; + + try { + try (final InputStream rawIn = session.read(flowFile)) { + if (size <= chunkUploadThreshold) { + try (UploadUploader uploader = createUploadUploader(uploadPath, conflictResolution)) { + fileMetadata = uploader.uploadAndFinish(rawIn); + } + } else { + fileMetadata = uploadLargeFileInChunks(uploadPath, rawIn, size, uploadChunkSize, conflictResolution); + } + } catch (UploadErrorException e) { + handleUploadError(conflictResolution, uploadPath, e); + } catch (RateLimitException e) { + context.yield(); + throw new ProcessException("Dropbox API rate limit exceeded while uploading file", e); + } + + if (fileMetadata != null) { + final Map attributes = createAttributeMap(fileMetadata); + String url = DROPBOX_HOME_URL + fileMetadata.getPathDisplay(); + flowFile = session.putAllAttributes(flowFile, attributes); + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, url, transferMillis); + } + + session.transfer(flowFile, REL_SUCCESS); + } catch (Exception e) { + getLogger().error("Exception occurred while uploading file '{}' to Dropbox folder '{}'", filename, folder, e); + flowFile = session.putAttribute(flowFile, ERROR_MESSAGE, e.getMessage()); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } + + @OnUnscheduled + public void shutdown() { + if (dbxUploader != null) { + dbxUploader.close(); + } + } + + private void handleUploadError(final String conflictResolution, final String uploadPath, final UploadErrorException e) throws UploadErrorException { + if (e.errorValue.isPath() && e.errorValue.getPathValue().getReason().isConflict()) { + + if (IGNORE_RESOLUTION.equals(conflictResolution)) { + getLogger().info("File with the same name [{}] already exists. Remote file is not modified due to {} being set to '{}'.", + uploadPath, CONFLICT_RESOLUTION.getDisplayName(), conflictResolution); + return; + } else if (conflictResolution.equals(FAIL_RESOLUTION)) { + throw new ProcessException(format("File with the same name [%s] already exists.", uploadPath), e); + } + } + throw new ProcessException(e); + } + + private FileMetadata uploadLargeFileInChunks(String path, InputStream rawIn, long size, long uploadChunkSize, String conflictResolution) throws DbxException, IOException { + final String sessionId; + try (UploadSessionStartUploader uploader = createUploadSessionStartUploader()) { + sessionId = uploader.uploadAndFinish(rawIn, uploadChunkSize).getSessionId(); + } + + long uploadedBytes = uploadChunkSize; + + UploadSessionCursor cursor = new UploadSessionCursor(sessionId, uploadedBytes); + + while (size - uploadedBytes > uploadChunkSize) { + try (UploadSessionAppendV2Uploader uploader = createUploadSessionAppendV2Uploader(cursor)) { + uploader.uploadAndFinish(rawIn, uploadChunkSize); + uploadedBytes += uploadChunkSize; + cursor = new UploadSessionCursor(sessionId, uploadedBytes); + } + } + + final long remainingBytes = size - uploadedBytes; + + final CommitInfo commitInfo = CommitInfo.newBuilder(path) + .withMode(getWriteMode(conflictResolution)) + .withStrictConflict(true) + .withClientModified(new Date(System.currentTimeMillis())) + .build(); + + try (UploadSessionFinishUploader uploader = createUploadSessionFinishUploader(cursor, commitInfo)) { + return uploader.uploadAndFinish(rawIn, remainingBytes); + } + } + + private WriteMode getWriteMode(String conflictResolution) { + if (OVERWRITE_RESOLUTION.equals(conflictResolution)) { + return WriteMode.OVERWRITE; + } else { + return WriteMode.ADD; + } + } + + private UploadUploader createUploadUploader(String path, String conflictResolution) throws DbxException { + final UploadUploader uploadUploader = dropboxApiClient + .files() + .uploadBuilder(path) + .withMode(getWriteMode(conflictResolution)) + .withStrictConflict(true) + .start(); + dbxUploader = uploadUploader; + return uploadUploader; + } + + private UploadSessionStartUploader createUploadSessionStartUploader() throws DbxException { + final UploadSessionStartUploader sessionStartUploader = dropboxApiClient + .files() + .uploadSessionStart(); + dbxUploader = sessionStartUploader; + return sessionStartUploader; + } + + private UploadSessionAppendV2Uploader createUploadSessionAppendV2Uploader(UploadSessionCursor cursor) throws DbxException { + final UploadSessionAppendV2Uploader sessionAppendV2Uploader = dropboxApiClient + .files() + .uploadSessionAppendV2(cursor); + dbxUploader = sessionAppendV2Uploader; + return sessionAppendV2Uploader; + } + + private UploadSessionFinishUploader createUploadSessionFinishUploader(UploadSessionCursor cursor, CommitInfo commitInfo) throws DbxException { + final UploadSessionFinishUploader sessionFinishUploader = dropboxApiClient + .files() + .uploadSessionFinish(cursor, commitInfo); + dbxUploader = sessionFinishUploader; + return sessionFinishUploader; + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 3496b3318e..ff2f61db82 100644 --- a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -14,3 +14,4 @@ # limitations under the License. org.apache.nifi.processors.dropbox.ListDropbox org.apache.nifi.processors.dropbox.FetchDropbox +org.apache.nifi.processors.dropbox.PutDropbox diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/AbstractDropboxTest.java b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/AbstractDropboxTest.java new file mode 100644 index 0000000000..b4c8872827 --- /dev/null +++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/AbstractDropboxTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.dropbox; + +import static java.util.stream.Collectors.toSet; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +import com.dropbox.core.v2.DbxClientV2; +import com.dropbox.core.v2.files.FileMetadata; +import java.util.Collections; +import java.util.Date; +import java.util.Set; +import org.apache.nifi.dropbox.credentials.service.DropboxCredentialService; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.junit.jupiter.api.BeforeEach; +import org.mockito.Mock; + +public class AbstractDropboxTest { + public static final String TEST_FOLDER = "/testFolder"; + public static final String FILENAME_1 = "file_name_1"; + public static final String FILENAME_2 = "file_name_2"; + public static final String FILE_ID_1 = "id:odTlUvbpIEAAAAAAAAAGGQ"; + public static final String FILE_ID_2 = "id:bdCQUvbpIEAABBAAAAAGKK"; + public static final long CREATED_TIME = 1659707000; + public static final long SIZE = 125; + public static final String REVISION = "5e4ddb1320676a5c29261"; + + protected TestRunner testRunner; + + @Mock + protected DbxClientV2 mockDropboxClient; + + @Mock + private DropboxCredentialService mockCredentialService; + + @BeforeEach + protected void setUp() throws Exception { + mockStandardDropboxCredentialService(); + } + + protected void assertProvenanceEvent(ProvenanceEventType eventType) { + Set expectedEventTypes = Collections.singleton(eventType); + Set actualEventTypes = testRunner.getProvenanceEvents().stream() + .map(ProvenanceEventRecord::getEventType) + .collect(toSet()); + assertEquals(expectedEventTypes, actualEventTypes); + } + + protected void assertNoProvenanceEvent() { + assertTrue(testRunner.getProvenanceEvents().isEmpty()); + } + + protected void mockStandardDropboxCredentialService() throws InitializationException { + String credentialServiceId = "dropbox_credentials"; + when(mockCredentialService.getIdentifier()).thenReturn(credentialServiceId); + testRunner.addControllerService(credentialServiceId, mockCredentialService); + testRunner.enableControllerService(mockCredentialService); + testRunner.setProperty(FetchDropbox.CREDENTIAL_SERVICE, credentialServiceId); + } + + protected FileMetadata createFileMetadata() { + return FileMetadata.newBuilder(FILENAME_1, FILE_ID_1, + new Date(CREATED_TIME), + new Date(CREATED_TIME), + REVISION, SIZE) + .withPathDisplay(getPath(TEST_FOLDER, FILENAME_1)) + .withIsDownloadable(true) + .build(); + } + + protected FileMetadata createFileMetadata( + String id, String filename, + String parent, + long createdTime, + boolean isDownloadable) { + return FileMetadata.newBuilder(filename, id, + new Date(createdTime), + new Date(createdTime), + REVISION, SIZE) + .withPathDisplay(getPath(parent, filename)) + .withIsDownloadable(isDownloadable) + .build(); + } + + protected FileMetadata createFileMetadata(String id, + String filename, + String parent, + long createdTime) { + return createFileMetadata(id, filename, parent, createdTime, true); + } + + protected void assertOutFlowFileAttributes(MockFlowFile flowFile) { + assertOutFlowFileAttributes(flowFile, TEST_FOLDER); + } + + protected void assertOutFlowFileAttributes(MockFlowFile flowFile, String folderName) { + flowFile.assertAttributeEquals(DropboxAttributes.ID, FILE_ID_1); + flowFile.assertAttributeEquals(DropboxAttributes.FILENAME, FILENAME_1); + flowFile.assertAttributeEquals(DropboxAttributes.PATH, folderName); + flowFile.assertAttributeEquals(DropboxAttributes.TIMESTAMP, Long.toString(CREATED_TIME)); + flowFile.assertAttributeEquals(DropboxAttributes.SIZE, Long.toString(SIZE)); + flowFile.assertAttributeEquals(DropboxAttributes.REVISION, REVISION); + } + + protected String getPath(String folder, String filename) { + return "/".equals(folder) ? folder + filename : folder + "/" + filename; + } +} diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/FetchDropboxTest.java b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/FetchDropboxTest.java index bb4bb9537b..c7920aff61 100644 --- a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/FetchDropboxTest.java +++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/FetchDropboxTest.java @@ -16,7 +16,9 @@ */ package org.apache.nifi.processors.dropbox; +import static java.lang.String.valueOf; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE; import static org.mockito.Mockito.when; import com.dropbox.core.DbxDownloader; @@ -28,12 +30,9 @@ import java.io.ByteArrayInputStream; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.nifi.dropbox.credentials.service.DropboxCredentialService; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.proxy.ProxyConfiguration; -import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -42,36 +41,19 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) -public class FetchDropboxTest { - - public static final String FILE_ID_1 = "id:odTlUvbpIEAAAAAAAAAGGQ"; - public static final String FILE_ID_2 = "id:odTlUvbpIEBBBBBBBBBGGQ"; - public static final String FILENAME = "file_name"; - public static final String FOLDER = "/testFolder"; - public static final String SIZE = "125"; - public static final String CREATED_TIME = "1659707000"; - public static final String REVISION = "5e4ddb1320676a5c29261"; - - private TestRunner testRunner; - - @Mock - private DbxClientV2 mockDropboxClient; - - @Mock - private DropboxCredentialService credentialService; +public class FetchDropboxTest extends AbstractDropboxTest { @Mock private DbxUserFilesRequests mockDbxUserFilesRequest; - @Mock private DbxDownloader mockDbxDownloader; @BeforeEach - void setUp() throws Exception { + public void setUp() throws Exception { FetchDropbox testSubject = new FetchDropbox() { @Override - public DbxClientV2 getDropboxApiClient(ProcessContext context, ProxyConfiguration proxyConfiguration, String clientId) { + public DbxClientV2 getDropboxApiClient(ProcessContext context, String id) { return mockDropboxClient; } }; @@ -79,19 +61,18 @@ public class FetchDropboxTest { testRunner = TestRunners.newTestRunner(testSubject); when(mockDropboxClient.files()).thenReturn(mockDbxUserFilesRequest); - - mockStandardDropboxCredentialService(); + super.setUp(); } @Test void testFileIsDownloadedById() throws Exception { - testRunner.setProperty(FetchDropbox.FILE, "${dropbox.id}"); when(mockDbxUserFilesRequest.download(FILE_ID_1)).thenReturn(mockDbxDownloader); when(mockDbxDownloader.getInputStream()).thenReturn(new ByteArrayInputStream("content".getBytes(UTF_8))); + when(mockDbxDownloader.getResult()).thenReturn(createFileMetadata()); - MockFlowFile inputFlowFile = getMockFlowFile(FILE_ID_1); + MockFlowFile inputFlowFile = getMockFlowFile(); testRunner.enqueue(inputFlowFile); testRunner.run(); @@ -99,17 +80,19 @@ public class FetchDropboxTest { List flowFiles = testRunner.getFlowFilesForRelationship(FetchDropbox.REL_SUCCESS); MockFlowFile ff0 = flowFiles.get(0); ff0.assertContentEquals("content"); - assertOutFlowFileAttributes(ff0, FILE_ID_1); + assertOutFlowFileAttributes(ff0); + assertProvenanceEvent(ProvenanceEventType.FETCH); } @Test void testFileIsDownloadedByPath() throws Exception { testRunner.setProperty(FetchDropbox.FILE, "${path}/${filename}"); - when(mockDbxUserFilesRequest.download(FOLDER + "/" + FILENAME)).thenReturn(mockDbxDownloader); + when(mockDbxUserFilesRequest.download(getPath(TEST_FOLDER, FILENAME_1))).thenReturn(mockDbxDownloader); when(mockDbxDownloader.getInputStream()).thenReturn(new ByteArrayInputStream("contentByPath".getBytes(UTF_8))); + when(mockDbxDownloader.getResult()).thenReturn(createFileMetadata()); - MockFlowFile inputFlowFile = getMockFlowFile(FILE_ID_1); + MockFlowFile inputFlowFile = getMockFlowFile(); testRunner.enqueue(inputFlowFile); testRunner.run(); @@ -117,53 +100,38 @@ public class FetchDropboxTest { List flowFiles = testRunner.getFlowFilesForRelationship(FetchDropbox.REL_SUCCESS); MockFlowFile ff0 = flowFiles.get(0); ff0.assertContentEquals("contentByPath"); - assertOutFlowFileAttributes(ff0, FILE_ID_1); + assertOutFlowFileAttributes(ff0); + assertProvenanceEvent(ProvenanceEventType.FETCH); } @Test void testFetchFails() throws Exception { testRunner.setProperty(FetchDropbox.FILE, "${dropbox.id}"); - when(mockDbxUserFilesRequest.download(FILE_ID_2)).thenThrow(new DbxException("Error in Dropbox")); + when(mockDbxUserFilesRequest.download(FILE_ID_1)).thenThrow(new DbxException("Error in Dropbox")); - MockFlowFile inputFlowFile = getMockFlowFile(FILE_ID_2); + MockFlowFile inputFlowFile = getMockFlowFile(); testRunner.enqueue(inputFlowFile); testRunner.run(); testRunner.assertAllFlowFilesTransferred(FetchDropbox.REL_FAILURE, 1); List flowFiles = testRunner.getFlowFilesForRelationship(FetchDropbox.REL_FAILURE); MockFlowFile ff0 = flowFiles.get(0); - ff0.assertAttributeEquals("error.message", "Error in Dropbox"); - assertOutFlowFileAttributes(ff0, FILE_ID_2); + ff0.assertAttributeEquals(ERROR_MESSAGE, "Error in Dropbox"); + assertOutFlowFileAttributes(ff0); + assertNoProvenanceEvent(); } - private void mockStandardDropboxCredentialService() throws InitializationException { - String credentialServiceId = "dropbox_credentials"; - when(credentialService.getIdentifier()).thenReturn(credentialServiceId); - testRunner.addControllerService(credentialServiceId, credentialService); - testRunner.enableControllerService(credentialService); - testRunner.setProperty(FetchDropbox.CREDENTIAL_SERVICE, credentialServiceId); - } - - private MockFlowFile getMockFlowFile(String fileId) { + private MockFlowFile getMockFlowFile() { MockFlowFile inputFlowFile = new MockFlowFile(0); Map attributes = new HashMap<>(); - attributes.put(DropboxFileInfo.ID, fileId); - attributes.put(DropboxFileInfo.REVISION, REVISION); - attributes.put(DropboxFileInfo.FILENAME, FILENAME); - attributes.put(DropboxFileInfo.PATH, FOLDER); - attributes.put(DropboxFileInfo.SIZE, SIZE); - attributes.put(DropboxFileInfo.TIMESTAMP, CREATED_TIME); + attributes.put(DropboxAttributes.ID, FILE_ID_1); + attributes.put(DropboxAttributes.REVISION, REVISION); + attributes.put(DropboxAttributes.FILENAME, FILENAME_1); + attributes.put(DropboxAttributes.PATH, TEST_FOLDER); + attributes.put(DropboxAttributes.SIZE, valueOf(SIZE)); + attributes.put(DropboxAttributes.TIMESTAMP, valueOf(CREATED_TIME)); inputFlowFile.putAttributes(attributes); return inputFlowFile; } - - private void assertOutFlowFileAttributes(MockFlowFile flowFile, String fileId) { - flowFile.assertAttributeEquals(DropboxFileInfo.ID, fileId); - flowFile.assertAttributeEquals(DropboxFileInfo.REVISION, REVISION); - flowFile.assertAttributeEquals(DropboxFileInfo.PATH, FOLDER); - flowFile.assertAttributeEquals(DropboxFileInfo.SIZE, SIZE); - flowFile.assertAttributeEquals(DropboxFileInfo.TIMESTAMP, CREATED_TIME); - flowFile.assertAttributeEquals(DropboxFileInfo.FILENAME, FILENAME); - } } diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/ListDropboxTest.java b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/ListDropboxTest.java index bdabbf9600..be08c2dc94 100644 --- a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/ListDropboxTest.java +++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/ListDropboxTest.java @@ -26,7 +26,6 @@ import static org.mockito.Mockito.when; import com.dropbox.core.DbxException; import com.dropbox.core.v2.DbxClientV2; import com.dropbox.core.v2.files.DbxUserFilesRequests; -import com.dropbox.core.v2.files.FileMetadata; import com.dropbox.core.v2.files.FolderMetadata; import com.dropbox.core.v2.files.ListFolderBuilder; import com.dropbox.core.v2.files.ListFolderResult; @@ -37,18 +36,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.Arrays; import java.util.Collections; -import java.util.Date; import java.util.List; import java.util.Spliterator; import java.util.stream.StreamSupport; -import org.apache.nifi.dropbox.credentials.service.DropboxCredentialService; import org.apache.nifi.json.JsonRecordSetWriter; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -57,27 +52,16 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) -public class ListDropboxTest { +public class ListDropboxTest extends AbstractDropboxTest { - public static final String ID_1 = "id:11111"; - public static final String ID_2 = "id:22222"; - public static final String TEST_FOLDER = "/testFolder"; - public static final String FILENAME_1 = "file_name_1"; - public static final String FILENAME_2 = "file_name_2"; - public static final long SIZE = 125; - public static final long CREATED_TIME = 1659707000; - public static final String REVISION = "5e4ddb1320676a5c29261"; + public static final String FOLDER_ID = "id:11111"; public static final boolean IS_RECURSIVE = true; public static final long MIN_TIMESTAMP = 1659707000; public static final long OLD_CREATED_TIME = 1657375066; - private TestRunner testRunner; @Mock private DbxClientV2 mockDropboxClient; - @Mock - private DropboxCredentialService credentialService; - @Mock private DbxUserFilesRequests mockDbxUserFilesRequest; @@ -88,10 +72,10 @@ public class ListDropboxTest { private ListFolderBuilder mockListFolderBuilder; @BeforeEach - void setUp() throws Exception { + protected void setUp() throws Exception { ListDropbox testSubject = new ListDropbox() { @Override - public DbxClientV2 getDropboxApiClient(ProcessContext context, ProxyConfiguration proxyConfiguration, String clientId) { + public DbxClientV2 getDropboxApiClient(ProcessContext context, String id) { return mockDropboxClient; } @@ -104,10 +88,9 @@ public class ListDropboxTest { testRunner = TestRunners.newTestRunner(testSubject); - mockStandardDropboxCredentialService(); - testRunner.setProperty(ListDropbox.RECURSIVE_SEARCH, Boolean.toString(IS_RECURSIVE)); testRunner.setProperty(ListDropbox.MIN_AGE, "0 sec"); + super.setUp(); } @Test @@ -140,7 +123,7 @@ public class ListDropboxTest { //root is listed when "" is used in Dropbox API when(mockDbxUserFilesRequest.listFolderBuilder("")).thenReturn(mockListFolderBuilder); when(mockListFolderResult.getEntries()).thenReturn(singletonList( - createFileMetadata(FILENAME_1, folderName, ID_1, CREATED_TIME) + createFileMetadata(FILE_ID_1, FILENAME_1, folderName, CREATED_TIME) )); testRunner.run(); @@ -148,7 +131,7 @@ public class ListDropboxTest { testRunner.assertAllFlowFilesTransferred(ListDropbox.REL_SUCCESS, 1); List flowFiles = testRunner.getFlowFilesForRelationship(ListDropbox.REL_SUCCESS); MockFlowFile ff0 = flowFiles.get(0); - assertFlowFileAttributes(ff0, folderName); + assertOutFlowFileAttributes(ff0, folderName); } @Test @@ -159,9 +142,9 @@ public class ListDropboxTest { when(mockDbxUserFilesRequest.listFolderBuilder(TEST_FOLDER)).thenReturn(mockListFolderBuilder); when(mockListFolderResult.getEntries()).thenReturn(Arrays.asList( - createFileMetadata(FILENAME_1, TEST_FOLDER, ID_1, CREATED_TIME), - createFolderMetadata("testFolder1", TEST_FOLDER), - createFileMetadata(FILENAME_2, TEST_FOLDER, ID_2, CREATED_TIME, false) + createFileMetadata(FILE_ID_1, FILENAME_1, TEST_FOLDER, CREATED_TIME), + createFolderMetadata(), + createFileMetadata(FILE_ID_2, FILENAME_2, TEST_FOLDER, CREATED_TIME, false) )); testRunner.run(); @@ -169,7 +152,7 @@ public class ListDropboxTest { testRunner.assertAllFlowFilesTransferred(ListDropbox.REL_SUCCESS, 1); List flowFiles = testRunner.getFlowFilesForRelationship(ListDropbox.REL_SUCCESS); MockFlowFile ff0 = flowFiles.get(0); - assertFlowFileAttributes(ff0, TEST_FOLDER); + assertOutFlowFileAttributes(ff0); } @Test @@ -180,8 +163,8 @@ public class ListDropboxTest { when(mockDbxUserFilesRequest.listFolderBuilder(TEST_FOLDER)).thenReturn(mockListFolderBuilder); when(mockListFolderResult.getEntries()).thenReturn(Arrays.asList( - createFileMetadata(FILENAME_1, TEST_FOLDER, ID_1, CREATED_TIME), - createFileMetadata(FILENAME_2, TEST_FOLDER, ID_2, OLD_CREATED_TIME) + createFileMetadata(FILE_ID_1, FILENAME_1, TEST_FOLDER, CREATED_TIME), + createFileMetadata(FILE_ID_2, FILENAME_2, TEST_FOLDER, OLD_CREATED_TIME) )); testRunner.run(); @@ -189,7 +172,7 @@ public class ListDropboxTest { testRunner.assertAllFlowFilesTransferred(ListDropbox.REL_SUCCESS, 1); List flowFiles = testRunner.getFlowFilesForRelationship(ListDropbox.REL_SUCCESS); MockFlowFile ff0 = flowFiles.get(0); - assertFlowFileAttributes(ff0, TEST_FOLDER); + assertOutFlowFileAttributes(ff0); } @Test @@ -201,8 +184,8 @@ public class ListDropboxTest { when(mockDbxUserFilesRequest.listFolderBuilder(TEST_FOLDER)).thenReturn(mockListFolderBuilder); when(mockListFolderResult.getEntries()).thenReturn(Arrays.asList( - createFileMetadata(FILENAME_1, TEST_FOLDER, ID_1, CREATED_TIME), - createFileMetadata(FILENAME_2, TEST_FOLDER, ID_2, CREATED_TIME) + createFileMetadata(FILE_ID_1, FILENAME_1, TEST_FOLDER, CREATED_TIME), + createFileMetadata(FILE_ID_2, FILENAME_2, TEST_FOLDER, CREATED_TIME) )); testRunner.run(); @@ -216,52 +199,12 @@ public class ListDropboxTest { assertEquals(expectedFileNames, actualFileNames); } - private void assertFlowFileAttributes(MockFlowFile flowFile, String folderName) { - flowFile.assertAttributeEquals(DropboxFileInfo.ID, ID_1); - flowFile.assertAttributeEquals(DropboxFileInfo.FILENAME, FILENAME_1); - flowFile.assertAttributeEquals(DropboxFileInfo.PATH, folderName); - flowFile.assertAttributeEquals(DropboxFileInfo.TIMESTAMP, Long.toString(CREATED_TIME)); - flowFile.assertAttributeEquals(DropboxFileInfo.SIZE, Long.toString(SIZE)); - flowFile.assertAttributeEquals(DropboxFileInfo.REVISION, REVISION); - } - - private FileMetadata createFileMetadata( - String filename, - String parent, - String id, - long createdTime, - boolean isDownloadable) { - return FileMetadata.newBuilder(filename, id, - new Date(createdTime), - new Date(createdTime), - REVISION, SIZE) - .withPathDisplay(parent + "/" + filename) - .withIsDownloadable(isDownloadable) + private Metadata createFolderMetadata() { + return FolderMetadata.newBuilder(FOLDER_ID) + .withPathDisplay(TEST_FOLDER + "/" + FOLDER_ID) .build(); } - private FileMetadata createFileMetadata( - String filename, - String parent, - String id, - long createdTime) { - return createFileMetadata(filename, parent, id, createdTime, true); - } - - private Metadata createFolderMetadata(String folderName, String parent) { - return FolderMetadata.newBuilder(folderName) - .withPathDisplay(parent + "/" + folderName) - .build(); - } - - private void mockStandardDropboxCredentialService() throws Exception { - String credentialServiceId = "dropbox_credentials"; - when(credentialService.getIdentifier()).thenReturn(credentialServiceId); - testRunner.addControllerService(credentialServiceId, credentialService); - testRunner.enableControllerService(credentialService); - testRunner.setProperty(ListDropbox.CREDENTIAL_SERVICE, credentialServiceId); - } - private void mockRecordWriter() throws InitializationException { RecordSetWriterFactory recordWriter = new JsonRecordSetWriter(); testRunner.addControllerService("record_writer", recordWriter); diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxIT.java b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxIT.java new file mode 100644 index 0000000000..5b2645b290 --- /dev/null +++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxIT.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.dropbox; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class PutDropboxIT extends AbstractDropboxIT { + + private static final String CONTENT = "content"; + private static final String CHANGED_CONTENT = "changedContent"; + private static final String NON_EXISTING_FOLDER = "/doesNotExistYet"; + + @BeforeEach + public void init() throws Exception { + super.init(); + testRunner.setProperty(PutDropbox.FILE_NAME, "testFile.json"); + } + + @AfterEach + public void teardown() throws Exception { + super.teardown(); + deleteFolderIfExists(NON_EXISTING_FOLDER); + } + + @Override + protected PutDropbox createTestSubject() { + return new PutDropbox(); + } + + @Test + void testUploadFileToExistingDirectory() { + testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER); + + testRunner.enqueue(CONTENT); + testRunner.run(); + + testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0); + testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1); + } + + @Test + void testUploadFileCreateFolderWithSubFolders() { + testRunner.setProperty(PutDropbox.FOLDER, NON_EXISTING_FOLDER + "/subFolder1/subFolder2"); + + testRunner.enqueue(CONTENT); + testRunner.run(); + + testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0); + testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1); + } + + @Test + void testEmptyFileIsUpladed() { + testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER); + + testRunner.enqueue(""); + testRunner.run(); + + testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0); + testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1); + } + + @Test + void testUploadExistingFileFailStrategy() { + testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER); + testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.FAIL_RESOLUTION); + + testRunner.enqueue(CONTENT); + testRunner.run(); + + testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1); + testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0); + testRunner.clearTransferState(); + + testRunner.enqueue(CHANGED_CONTENT); + testRunner.run(); + testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 0); + testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 1); + } + + @Test + void testUploadExistingFileWithSameContentFailStrategy() { + testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER); + testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.FAIL_RESOLUTION); + + testRunner.enqueue(CONTENT); + testRunner.run(); + + testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1); + testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0); + + testRunner.clearTransferState(); + + testRunner.enqueue(CONTENT); + testRunner.run(); + testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 0); + testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 1); + } + + @Test + void testUploadExistingFileOverwriteStrategy() { + testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER); + testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.OVERWRITE_RESOLUTION); + + testRunner.enqueue(CONTENT); + testRunner.run(); + + testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0); + testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1); + testRunner.clearTransferState(); + + testRunner.enqueue(CHANGED_CONTENT); + testRunner.run(); + testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1); + testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0); + } + + @Test + void testUploadExistingFileIgnoreStrategy() { + testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER); + testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.IGNORE_RESOLUTION); + + testRunner.enqueue(CONTENT); + testRunner.run(); + + testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0); + testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1); + testRunner.clearTransferState(); + + testRunner.enqueue(CHANGED_CONTENT); + testRunner.run(); + + testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1); + testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0); + } +} diff --git a/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxTest.java b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxTest.java new file mode 100644 index 0000000000..05e4c2b05f --- /dev/null +++ b/nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxTest.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.dropbox; + +import static com.dropbox.core.v2.files.UploadError.path; +import static com.dropbox.core.v2.files.WriteConflictError.FILE; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE; +import static org.mockito.Answers.RETURNS_DEEP_STUBS; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.dropbox.core.DbxException; +import com.dropbox.core.LocalizedText; +import com.dropbox.core.v2.DbxClientV2; +import com.dropbox.core.v2.files.CommitInfo; +import com.dropbox.core.v2.files.DbxUserFilesRequests; +import com.dropbox.core.v2.files.UploadErrorException; +import com.dropbox.core.v2.files.UploadSessionAppendV2Uploader; +import com.dropbox.core.v2.files.UploadSessionCursor; +import com.dropbox.core.v2.files.UploadSessionFinishUploader; +import com.dropbox.core.v2.files.UploadSessionStartResult; +import com.dropbox.core.v2.files.UploadSessionStartUploader; +import com.dropbox.core.v2.files.UploadUploader; +import com.dropbox.core.v2.files.UploadWriteFailed; +import com.dropbox.core.v2.files.WriteError; +import com.dropbox.core.v2.files.WriteMode; +import java.io.InputStream; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class PutDropboxTest extends AbstractDropboxTest { + + public static final long CHUNKED_UPLOAD_SIZE_IN_BYTES = 8; + public static final long CHUNKED_UPLOAD_THRESHOLD_IN_BYTES = 15; + public static final String CONTENT = "1234567890"; + public static final String LARGE_CONTENT_30B = "123456789012345678901234567890"; + public static final String SESSION_ID = "sessionId"; + + + @Mock(answer = RETURNS_DEEP_STUBS) + private DbxUserFilesRequests mockDbxUserFilesRequest; + + @Mock + private UploadUploader mockUploadUploader; + + @Mock + private UploadSessionStartUploader mockUploadSessionStartUploader; + + @Mock + private UploadSessionStartResult mockUploadSessionStartResult; + + @Mock + private UploadSessionAppendV2Uploader mockUploadSessionAppendV2Uploader; + + @Mock + private UploadSessionFinishUploader mockUploadSessionFinishUploader; + + @BeforeEach + protected void setUp() throws Exception { + final PutDropbox testSubject = new PutDropbox() { + @Override + public DbxClientV2 getDropboxApiClient(ProcessContext context, String id) { + return mockDropboxClient; + } + }; + testRunner = TestRunners.newTestRunner(testSubject); + testRunner.setProperty(PutDropbox.FOLDER, TEST_FOLDER); + super.setUp(); + } + + @Test + void testFolderValidity() { + testRunner.setProperty(PutDropbox.FOLDER, "/"); + testRunner.assertValid(); + testRunner.setProperty(PutDropbox.FOLDER, "/tempFolder"); + testRunner.assertValid(); + } + + @Test + void testUploadChunkSizeValidity() { + testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_SIZE, ""); + testRunner.assertNotValid(); + testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_SIZE, "40 MB"); + testRunner.assertValid(); + testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_SIZE, "152 MB"); + testRunner.assertNotValid(); + testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_SIZE, "1024"); + testRunner.assertNotValid(); + } + + @Test + void testFileUploadFileNameFromProperty() throws Exception { + testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1); + mockFileUpload(TEST_FOLDER, FILENAME_1); + runWithFlowFile(); + + testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1); + List flowFiles = testRunner.getFlowFilesForRelationship(PutDropbox.REL_SUCCESS); + MockFlowFile ff0 = flowFiles.get(0); + assertOutFlowFileAttributes(ff0); + assertProvenanceEvent(ProvenanceEventType.SEND); + } + + @Test + void testFileUploadFileNameFromFlowFileAttribute() throws Exception { + mockFileUpload(TEST_FOLDER, FILENAME_2); + + final MockFlowFile mockFlowFile = getMockFlowFile(CONTENT); + final Map attributes = new HashMap<>(); + attributes.put("filename", FILENAME_2); + mockFlowFile.putAttributes(attributes); + testRunner.enqueue(mockFlowFile); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1); + assertProvenanceEvent(ProvenanceEventType.SEND); + } + + @Test + void testFileUploadFileToRoot() throws Exception { + testRunner.setProperty(PutDropbox.FOLDER, "/"); + testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1); + + mockFileUpload("/", FILENAME_1); + + runWithFlowFile(); + testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1); + assertProvenanceEvent(ProvenanceEventType.SEND); + + List flowFiles = testRunner.getFlowFilesForRelationship(PutDropbox.REL_SUCCESS); + MockFlowFile ff0 = flowFiles.get(0); + assertOutFlowFileAttributes(ff0, "/"); + } + + @Test + void testFileUploadWithOverwriteConflictResolutionStrategy() throws Exception { + testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1); + testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.OVERWRITE_RESOLUTION); + + mockFileUpload(TEST_FOLDER, FILENAME_1, WriteMode.OVERWRITE); + + runWithFlowFile(); + testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1); + assertProvenanceEvent(ProvenanceEventType.SEND); + } + + @Test + void testFileUploadError() throws Exception { + testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1); + + mockFileUploadError(new DbxException("Dropbox error")); + + runWithFlowFile(); + + testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_FAILURE, 1); + List flowFiles = testRunner.getFlowFilesForRelationship(PutDropbox.REL_FAILURE); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals(ERROR_MESSAGE, "Dropbox error"); + assertNoProvenanceEvent(); + } + + @Test + void testFileUploadOtherExceptionIsNotIgnored() throws Exception { + testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1); + testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.IGNORE_RESOLUTION); + + mockFileUploadError(getException(WriteError.INSUFFICIENT_SPACE)); + + runWithFlowFile(); + testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_FAILURE, 1); + assertNoProvenanceEvent(); + } + + @Test + void testFileUploadConflictIgnoredWithIgnoreResolutionStrategy() throws Exception { + testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1); + testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.IGNORE_RESOLUTION); + + mockFileUploadError(getException(WriteError.conflict(FILE))); + + runWithFlowFile(); + testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1); + assertNoProvenanceEvent(); + } + + @Test + void testFileUploadConflictNotIgnoredWithDefaultFailStrategy() throws Exception { + testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1); + + mockFileUploadError(getException(WriteError.conflict(FILE))); + + runWithFlowFile(); + testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_FAILURE, 1); + assertNoProvenanceEvent(); + } + + @Test + void testFileUploadLargeFile() throws Exception { + MockFlowFile mockFlowFile = getMockFlowFile(LARGE_CONTENT_30B); + + testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1); + testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_SIZE, CHUNKED_UPLOAD_SIZE_IN_BYTES + " B"); + testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_THRESHOLD, CHUNKED_UPLOAD_THRESHOLD_IN_BYTES + " B"); + + when(mockDropboxClient.files()) + .thenReturn(mockDbxUserFilesRequest); + + //start session: 8 bytes uploaded + when(mockDbxUserFilesRequest + .uploadSessionStart()) + .thenReturn(mockUploadSessionStartUploader); + + when(mockUploadSessionStartUploader + .uploadAndFinish(any(InputStream.class), eq(CHUNKED_UPLOAD_SIZE_IN_BYTES))) + .thenReturn(mockUploadSessionStartResult); + + when(mockUploadSessionStartResult + .getSessionId()) + .thenReturn(SESSION_ID); + + //append session: invoked twice, 2 * 8 bytes uploaded + when(mockDbxUserFilesRequest + .uploadSessionAppendV2(any(UploadSessionCursor.class))) + .thenReturn(mockUploadSessionAppendV2Uploader); + + //finish session: 30 - 8 - 2 * 8 = 6 bytes uploaded + CommitInfo commitInfo = CommitInfo.newBuilder(getPath(TEST_FOLDER , FILENAME_1)) + .withMode(WriteMode.ADD) + .withStrictConflict(true) + .withClientModified(new Date(mockFlowFile.getEntryDate())) + .build(); + + when(mockDbxUserFilesRequest + .uploadSessionFinish(any(UploadSessionCursor.class), eq(commitInfo))) + .thenReturn(mockUploadSessionFinishUploader); + + when(mockUploadSessionFinishUploader + .uploadAndFinish(any(InputStream.class), eq(6L))) + .thenReturn(createFileMetadata(FILE_ID_1, FILENAME_1, TEST_FOLDER, CREATED_TIME)); + + testRunner.enqueue(mockFlowFile); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1); + + verify(mockUploadSessionAppendV2Uploader, times(2)) + .uploadAndFinish(any(InputStream.class), eq(CHUNKED_UPLOAD_SIZE_IN_BYTES)); + assertProvenanceEvent(ProvenanceEventType.SEND); + } + + private void mockFileUpload(String folder, String filename) throws Exception { + mockFileUpload(folder, filename, WriteMode.ADD); + } + + private void mockFileUpload(String folder, String filename, WriteMode writeMode) throws Exception { + when(mockDropboxClient.files()) + .thenReturn(mockDbxUserFilesRequest); + + when(mockDbxUserFilesRequest + .uploadBuilder(getPath(folder, filename)) + .withMode(writeMode) + .withStrictConflict(true) + .start()) + .thenReturn(mockUploadUploader); + + when(mockUploadUploader + .uploadAndFinish(any(InputStream.class))) + .thenReturn(createFileMetadata(FILE_ID_1, filename, folder, CREATED_TIME)); + } + + private void mockFileUploadError(DbxException exception) throws Exception { + when(mockDropboxClient.files()) + .thenReturn(mockDbxUserFilesRequest); + + when(mockDbxUserFilesRequest + .uploadBuilder(getPath(TEST_FOLDER, FILENAME_1)) + .withMode(WriteMode.ADD) + .withStrictConflict(true) + .start()) + .thenReturn(mockUploadUploader); + + when(mockUploadUploader + .uploadAndFinish(any(InputStream.class))) + .thenThrow(exception); + } + + private UploadErrorException getException(WriteError writeErrorReason) { + return new UploadErrorException("route", "requestId", new LocalizedText("upload error", "en-us"), + path(new UploadWriteFailed(writeErrorReason, "uploadSessionId"))); + } + + private MockFlowFile getMockFlowFile(String content) { + MockFlowFile inputFlowFile = new MockFlowFile(0); + inputFlowFile.setData(content.getBytes(UTF_8)); + return inputFlowFile; + } + + private void runWithFlowFile() { + MockFlowFile mockFlowFile = getMockFlowFile(CONTENT); + testRunner.enqueue(mockFlowFile); + testRunner.run(); + } +} \ No newline at end of file