NIFI-10868 PutDropbox processor

This closes #6740.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
krisztina-zsihovszki 2022-11-30 14:39:51 +01:00 committed by Peter Turcsanyi
parent 5c3ca9d537
commit 1d5a1bff08
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
13 changed files with 1195 additions and 198 deletions

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.dropbox;
public class DropboxAttributes {
public static final String ID = "dropbox.id";
public static final String ID_DESC = "The Dropbox identifier of the file";
public static final String PATH = "path";
public static final String PATH_DESC = "The folder path where the file is located";
public static final String FILENAME = "filename";
public static final String FILENAME_DESC = "The name of the file";
public static final String SIZE = "dropbox.size";
public static final String SIZE_DESC = "The size of the file";
public static final String TIMESTAMP = "dropbox.timestamp";
public static final String TIMESTAMP_DESC = "The server modified time, when the file was uploaded to Dropbox";
public static final String REVISION = "dropbox.revision";
public static final String REVISION_DESC = "Revision of the file";
public static final String ERROR_MESSAGE = "error.message";
public static final String ERROR_MESSAGE_DESC = "The error message returned by Dropbox when the fetch of a file fails";
}

View File

@ -16,6 +16,13 @@
*/ */
package org.apache.nifi.processors.dropbox; package org.apache.nifi.processors.dropbox;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -31,13 +38,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
public class DropboxFileInfo implements ListableEntity { public class DropboxFileInfo implements ListableEntity {
public static final String ID = "dropbox.id";
public static final String PATH = "path";
public static final String FILENAME = "filename";
public static final String SIZE = "dropbox.size";
public static final String TIMESTAMP = "dropbox.timestamp";
public static final String REVISION = "dropbox.revision";
private static final RecordSchema SCHEMA; private static final RecordSchema SCHEMA;
static { static {

View File

@ -19,12 +19,12 @@ package org.apache.nifi.processors.dropbox;
import java.util.function.Function; import java.util.function.Function;
public enum DropboxFlowFileAttribute { public enum DropboxFlowFileAttribute {
ID(DropboxFileInfo.ID, DropboxFileInfo::getId), ID(DropboxAttributes.ID, DropboxFileInfo::getId),
PATH(DropboxFileInfo.PATH, DropboxFileInfo::getPath), PATH(DropboxAttributes.PATH, DropboxFileInfo::getPath),
FILENAME(DropboxFileInfo.FILENAME, DropboxFileInfo::getName), FILENAME(DropboxAttributes.FILENAME, DropboxFileInfo::getName),
SIZE(DropboxFileInfo.SIZE, fileInfo -> String.valueOf(fileInfo.getSize())), SIZE(DropboxAttributes.SIZE, fileInfo -> String.valueOf(fileInfo.getSize())),
TIMESTAMP(DropboxFileInfo.TIMESTAMP, fileInfo -> String.valueOf(fileInfo.getTimestamp())), TIMESTAMP(DropboxAttributes.TIMESTAMP, fileInfo -> String.valueOf(fileInfo.getTimestamp())),
REVISION(DropboxFileInfo.REVISION, DropboxFileInfo::getRevision); REVISION(DropboxAttributes.REVISION, DropboxFileInfo::getRevision);
private final String name; private final String name;
private final Function<DropboxFileInfo, String> fromFileInfo; private final Function<DropboxFileInfo, String> fromFileInfo;

View File

@ -16,12 +16,18 @@
*/ */
package org.apache.nifi.processors.dropbox; package org.apache.nifi.processors.dropbox;
import static java.lang.String.format;
import static java.lang.String.valueOf;
import com.dropbox.core.DbxRequestConfig; import com.dropbox.core.DbxRequestConfig;
import com.dropbox.core.http.HttpRequestor; import com.dropbox.core.http.HttpRequestor;
import com.dropbox.core.http.OkHttp3Requestor; import com.dropbox.core.http.OkHttp3Requestor;
import com.dropbox.core.oauth.DbxCredential; import com.dropbox.core.oauth.DbxCredential;
import com.dropbox.core.v2.DbxClientV2; import com.dropbox.core.v2.DbxClientV2;
import com.dropbox.core.v2.files.FileMetadata;
import java.net.Proxy; import java.net.Proxy;
import java.util.HashMap;
import java.util.Map;
import okhttp3.Credentials; import okhttp3.Credentials;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
@ -32,6 +38,8 @@ import org.apache.nifi.proxy.ProxyConfiguration;
public interface DropboxTrait { public interface DropboxTrait {
String DROPBOX_HOME_URL = "https://www.dropbox.com/home";
PropertyDescriptor CREDENTIAL_SERVICE = new PropertyDescriptor.Builder() PropertyDescriptor CREDENTIAL_SERVICE = new PropertyDescriptor.Builder()
.name("dropbox-credential-service") .name("dropbox-credential-service")
.displayName("Dropbox Credential Service") .displayName("Dropbox Credential Service")
@ -41,9 +49,10 @@ public interface DropboxTrait {
.required(true) .required(true)
.build(); .build();
default DbxClientV2 getDropboxApiClient(ProcessContext context, String identifier) {
default DbxClientV2 getDropboxApiClient(ProcessContext context, ProxyConfiguration proxyConfiguration, String clientId) { final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context);
OkHttpClient.Builder okHttpClientBuilder = OkHttp3Requestor.defaultOkHttpClientBuilder(); final String dropboxClientId = format("%s-%s", getClass().getSimpleName(), identifier);
final OkHttpClient.Builder okHttpClientBuilder = OkHttp3Requestor.defaultOkHttpClientBuilder();
if (!Proxy.Type.DIRECT.equals(proxyConfiguration.getProxyType())) { if (!Proxy.Type.DIRECT.equals(proxyConfiguration.getProxyType())) {
okHttpClientBuilder.proxy(proxyConfiguration.createProxy()); okHttpClientBuilder.proxy(proxyConfiguration.createProxy());
@ -58,16 +67,37 @@ public interface DropboxTrait {
} }
} }
HttpRequestor httpRequestor = new OkHttp3Requestor(okHttpClientBuilder.build()); final HttpRequestor httpRequestor = new OkHttp3Requestor(okHttpClientBuilder.build());
DbxRequestConfig config = DbxRequestConfig.newBuilder(clientId) final DbxRequestConfig config = DbxRequestConfig.newBuilder(dropboxClientId)
.withHttpRequestor(httpRequestor) .withHttpRequestor(httpRequestor)
.build(); .build();
final DropboxCredentialService credentialService = context.getProperty(CREDENTIAL_SERVICE) final DropboxCredentialService credentialService = context.getProperty(CREDENTIAL_SERVICE)
.asControllerService(DropboxCredentialService.class); .asControllerService(DropboxCredentialService.class);
DropboxCredentialDetails credential = credentialService.getDropboxCredential(); final DropboxCredentialDetails credential = credentialService.getDropboxCredential();
return new DbxClientV2(config, new DbxCredential(credential.getAccessToken(), -1L, return new DbxClientV2(config, new DbxCredential(credential.getAccessToken(), -1L,
credential.getRefreshToken(), credential.getAppKey(), credential.getAppSecret())); credential.getRefreshToken(), credential.getAppKey(), credential.getAppSecret()));
} }
default String convertFolderName(String folderName) {
return "/".equals(folderName) ? "" : folderName;
}
default String getParentPath(String fullPath) {
final int idx = fullPath.lastIndexOf("/");
final String parentPath = fullPath.substring(0, idx);
return "".equals(parentPath) ? "/" : parentPath;
}
default Map<String, String> createAttributeMap(FileMetadata fileMetadata) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(DropboxAttributes.ID, fileMetadata.getId());
attributes.put(DropboxAttributes.PATH, getParentPath(fileMetadata.getPathDisplay()));
attributes.put(DropboxAttributes.FILENAME, fileMetadata.getName());
attributes.put(DropboxAttributes.SIZE, valueOf(fileMetadata.getSize()));
attributes.put(DropboxAttributes.REVISION, fileMetadata.getRev());
attributes.put(DropboxAttributes.TIMESTAMP, valueOf(fileMetadata.getServerModified().getTime()));
return attributes;
}
} }

View File

@ -16,16 +16,33 @@
*/ */
package org.apache.nifi.processors.dropbox; package org.apache.nifi.processors.dropbox;
import static java.lang.String.format; import static org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE_DESC;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME_DESC;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID_DESC;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH_DESC;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION_DESC;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE_DESC;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP_DESC;
import com.dropbox.core.DbxDownloader;
import com.dropbox.core.DbxException; import com.dropbox.core.DbxException;
import com.dropbox.core.v2.DbxClientV2; import com.dropbox.core.v2.DbxClientV2;
import com.dropbox.core.v2.files.FileMetadata;
import java.io.InputStream; import java.io.InputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -34,6 +51,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
@ -49,15 +67,18 @@ import org.apache.nifi.proxy.ProxySpec;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"dropbox", "storage", "fetch"}) @Tags({"dropbox", "storage", "fetch"})
@CapabilityDescription("Fetches files from Dropbox. Designed to be used in tandem with ListDropbox.") @CapabilityDescription("Fetches files from Dropbox. Designed to be used in tandem with ListDropbox.")
@WritesAttribute(attribute = "error.message", description = "When a FlowFile is routed to 'failure', this attribute is added indicating why the file could " @SeeAlso({PutDropbox.class, ListDropbox.class})
+ "not be fetched from Dropbox.") @WritesAttributes({
@SeeAlso(ListDropbox.class) @WritesAttribute(attribute = ERROR_MESSAGE, description = ERROR_MESSAGE_DESC),
@WritesAttributes( @WritesAttribute(attribute = ID, description = ID_DESC),
@WritesAttribute(attribute = FetchDropbox.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by Dropbox when the fetch of a file fails.")) @WritesAttribute(attribute = PATH, description = PATH_DESC),
@WritesAttribute(attribute = FILENAME, description = FILENAME_DESC),
@WritesAttribute(attribute = SIZE, description = SIZE_DESC),
@WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
@WritesAttribute(attribute = REVISION, description = REVISION_DESC)}
)
public class FetchDropbox extends AbstractProcessor implements DropboxTrait { public class FetchDropbox extends AbstractProcessor implements DropboxTrait {
public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
public static final PropertyDescriptor FILE = new PropertyDescriptor public static final PropertyDescriptor FILE = new PropertyDescriptor
.Builder().name("file") .Builder().name("file")
.displayName("File") .displayName("File")
@ -83,7 +104,7 @@ public class FetchDropbox extends AbstractProcessor implements DropboxTrait {
.description("A FlowFile will be routed here for each File for which fetch was attempted but failed.") .description("A FlowFile will be routed here for each File for which fetch was attempted but failed.")
.build(); .build();
public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS, REL_SUCCESS,
REL_FAILURE REL_FAILURE
))); )));
@ -96,9 +117,11 @@ public class FetchDropbox extends AbstractProcessor implements DropboxTrait {
private DbxClientV2 dropboxApiClient; private DbxClientV2 dropboxApiClient;
private DbxDownloader<FileMetadata> dbxDownloader;
@Override @Override
public Set<Relationship> getRelationships() { public Set<Relationship> getRelationships() {
return relationships; return RELATIONSHIPS;
} }
@Override @Override
@ -108,14 +131,19 @@ public class FetchDropbox extends AbstractProcessor implements DropboxTrait {
@OnScheduled @OnScheduled
public void onScheduled(final ProcessContext context) { public void onScheduled(final ProcessContext context) {
final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context); dropboxApiClient = getDropboxApiClient(context, getIdentifier());
String dropboxClientId = format("%s-%s", getClass().getSimpleName(), getIdentifier()); }
dropboxApiClient = getDropboxApiClient(context, proxyConfiguration, dropboxClientId);
@OnUnscheduled
public void shutdown() {
if (dbxDownloader != null) {
dbxDownloader.close();
}
} }
@Override @Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get(); final FlowFile flowFile = session.get();
if (flowFile == null) { if (flowFile == null) {
return; return;
} }
@ -124,24 +152,35 @@ public class FetchDropbox extends AbstractProcessor implements DropboxTrait {
fileIdentifier = correctFilePath(fileIdentifier); fileIdentifier = correctFilePath(fileIdentifier);
FlowFile outFlowFile = flowFile; FlowFile outFlowFile = flowFile;
final long startNanos = System.nanoTime();
try { try {
fetchFile(fileIdentifier, session, outFlowFile); FileMetadata fileMetadata = fetchFile(fileIdentifier, session, outFlowFile);
final Map<String, String> attributes = createAttributeMap(fileMetadata);
outFlowFile = session.putAllAttributes(outFlowFile, attributes);
String url = DROPBOX_HOME_URL + fileMetadata.getPathDisplay();
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().fetch(flowFile, url, transferMillis);
session.transfer(outFlowFile, REL_SUCCESS); session.transfer(outFlowFile, REL_SUCCESS);
} catch (Exception e) { } catch (Exception e) {
handleError(session, flowFile, fileIdentifier, e); handleError(session, flowFile, fileIdentifier, e);
} }
} }
private void fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws DbxException { private FileMetadata fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws DbxException {
InputStream dropboxInputStream = dropboxApiClient.files() try (DbxDownloader<FileMetadata> downloader = dropboxApiClient.files().download(fileId)) {
.download(fileId) dbxDownloader = downloader;
.getInputStream(); final InputStream dropboxInputStream = downloader.getInputStream();
session.importFrom(dropboxInputStream, outFlowFile); session.importFrom(dropboxInputStream, outFlowFile);
return downloader.getResult();
}
} }
private void handleError(ProcessSession session, FlowFile flowFile, String fileId, Exception e) { private void handleError(ProcessSession session, FlowFile flowFile, String fileId, Exception e) {
getLogger().error("Error while fetching and processing file with id '{}'", fileId, e); getLogger().error("Error while fetching and processing file with id '{}'", fileId, e);
FlowFile outFlowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage()); final FlowFile outFlowFile = session.putAttribute(flowFile, ERROR_MESSAGE, e.getMessage());
session.penalize(outFlowFile);
session.transfer(outFlowFile, REL_FAILURE); session.transfer(outFlowFile, REL_FAILURE);
} }

View File

@ -18,6 +18,18 @@ package org.apache.nifi.processors.dropbox;
import static java.lang.String.format; import static java.lang.String.format;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME_DESC;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID_DESC;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH_DESC;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION_DESC;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE_DESC;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP_DESC;
import com.dropbox.core.DbxException; import com.dropbox.core.DbxException;
import com.dropbox.core.v2.DbxClientV2; import com.dropbox.core.v2.DbxClientV2;
@ -69,17 +81,19 @@ import org.apache.nifi.serialization.record.RecordSchema;
" This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the" + " This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the" +
" previous node left off without duplicating all of the data.") " previous node left off without duplicating all of the data.")
@InputRequirement(Requirement.INPUT_FORBIDDEN) @InputRequirement(Requirement.INPUT_FORBIDDEN)
@WritesAttributes({@WritesAttribute(attribute = DropboxFileInfo.ID, description = "The Dropbox identifier of the file"), @WritesAttributes({
@WritesAttribute(attribute = DropboxFileInfo.PATH, description = "The folder path where the file is located"), @WritesAttribute(attribute = ID, description = ID_DESC),
@WritesAttribute(attribute = DropboxFileInfo.FILENAME, description = "The name of the file"), @WritesAttribute(attribute = PATH, description = PATH_DESC),
@WritesAttribute(attribute = DropboxFileInfo.SIZE, description = "The size of the file"), @WritesAttribute(attribute = FILENAME, description = FILENAME_DESC),
@WritesAttribute(attribute = DropboxFileInfo.TIMESTAMP, description = "The server modified time, when the file was uploaded to Dropbox"), @WritesAttribute(attribute = SIZE, description = SIZE_DESC),
@WritesAttribute(attribute = DropboxFileInfo.REVISION, description = "Revision of the file")}) @WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
@WritesAttribute(attribute = REVISION, description = REVISION_DESC)})
@Stateful(scopes = {Scope.CLUSTER}, description = "The processor stores necessary data to be able to keep track what files have been listed already. " + @Stateful(scopes = {Scope.CLUSTER}, description = "The processor stores necessary data to be able to keep track what files have been listed already. " +
"What exactly needs to be stored depends on the 'Listing Strategy'.") "What exactly needs to be stored depends on the 'Listing Strategy'.")
@SeeAlso(FetchDropbox.class) @SeeAlso({FetchDropbox.class, PutDropbox.class})
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
public class ListDropbox extends AbstractListProcessor<DropboxFileInfo> implements DropboxTrait { public class ListDropbox extends AbstractListProcessor<DropboxFileInfo> implements DropboxTrait {
public static final PropertyDescriptor FOLDER = new PropertyDescriptor.Builder() public static final PropertyDescriptor FOLDER = new PropertyDescriptor.Builder()
.name("folder") .name("folder")
.displayName("Folder") .displayName("Folder")
@ -148,9 +162,7 @@ public class ListDropbox extends AbstractListProcessor<DropboxFileInfo> implemen
@OnScheduled @OnScheduled
public void onScheduled(final ProcessContext context) { public void onScheduled(final ProcessContext context) {
final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context); dropboxApiClient = getDropboxApiClient(context, getIdentifier());
String dropboxClientId = format("%s-%s", getClass().getSimpleName(), getIdentifier());
dropboxApiClient = getDropboxApiClient(context, proxyConfiguration, dropboxClientId);
} }
@Override @Override
@ -189,20 +201,20 @@ public class ListDropbox extends AbstractListProcessor<DropboxFileInfo> implemen
try { try {
Predicate<FileMetadata> metadataFilter = createMetadataFilter(minTimestamp, minAge); Predicate<FileMetadata> metadataFilter = createMetadataFilter(minTimestamp, minAge);
ListFolderBuilder listFolderBuilder = dropboxApiClient.files().listFolderBuilder(convertFolderName(folderName)); final ListFolderBuilder listFolderBuilder = dropboxApiClient.files().listFolderBuilder(convertFolderName(folderName));
ListFolderResult result = listFolderBuilder ListFolderResult result = listFolderBuilder
.withRecursive(recursive) .withRecursive(recursive)
.start(); .start();
List<FileMetadata> metadataList = new ArrayList<>(filterMetadata(result, metadataFilter)); final List<FileMetadata> metadataList = new ArrayList<>(filterMetadata(result, metadataFilter));
while (result.getHasMore()) { while (result.getHasMore()) {
result = dropboxApiClient.files().listFolderContinue(result.getCursor()); result = dropboxApiClient.files().listFolderContinue(result.getCursor());
metadataList.addAll(filterMetadata(result, metadataFilter)); metadataList.addAll(filterMetadata(result, metadataFilter));
} }
for (FileMetadata metadata : metadataList) { for (final FileMetadata metadata : metadataList) {
DropboxFileInfo.Builder builder = new DropboxFileInfo.Builder() final DropboxFileInfo.Builder builder = new DropboxFileInfo.Builder()
.id(metadata.getId()) .id(metadata.getId())
.path(getParentPath(metadata.getPathDisplay())) .path(getParentPath(metadata.getPathDisplay()))
.name(metadata.getName()) .name(metadata.getName())
@ -268,13 +280,5 @@ public class ListDropbox extends AbstractListProcessor<DropboxFileInfo> implemen
.collect(toList()); .collect(toList());
} }
private String getParentPath(String fullPath) {
int idx = fullPath.lastIndexOf("/");
String parentPath = fullPath.substring(0, idx);
return "".equals(parentPath) ? "/" : parentPath;
}
private String convertFolderName(String folderName) {
return "/".equals(folderName) ? "" : folderName;
}
} }

View File

@ -0,0 +1,358 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.dropbox;
import static java.lang.String.format;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE_DESC;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.FILENAME_DESC;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.ID_DESC;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.PATH_DESC;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.REVISION_DESC;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.SIZE_DESC;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.TIMESTAMP_DESC;
import com.dropbox.core.DbxException;
import com.dropbox.core.DbxUploader;
import com.dropbox.core.RateLimitException;
import com.dropbox.core.v2.DbxClientV2;
import com.dropbox.core.v2.files.CommitInfo;
import com.dropbox.core.v2.files.FileMetadata;
import com.dropbox.core.v2.files.UploadErrorException;
import com.dropbox.core.v2.files.UploadSessionAppendV2Uploader;
import com.dropbox.core.v2.files.UploadSessionCursor;
import com.dropbox.core.v2.files.UploadSessionFinishUploader;
import com.dropbox.core.v2.files.UploadSessionStartUploader;
import com.dropbox.core.v2.files.UploadUploader;
import com.dropbox.core.v2.files.WriteMode;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
/**
* This processor uploads objects to Dropbox.
*/
@SeeAlso({ListDropbox.class, FetchDropbox.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"dropbox", "storage", "put"})
@CapabilityDescription("Puts content to a Dropbox folder.")
@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the Dropbox object.")
@WritesAttributes({
@WritesAttribute(attribute = ERROR_MESSAGE, description = ERROR_MESSAGE_DESC),
@WritesAttribute(attribute = ID, description = ID_DESC),
@WritesAttribute(attribute = PATH, description = PATH_DESC),
@WritesAttribute(attribute = FILENAME, description = FILENAME_DESC),
@WritesAttribute(attribute = SIZE, description = SIZE_DESC),
@WritesAttribute(attribute = TIMESTAMP, description = TIMESTAMP_DESC),
@WritesAttribute(attribute = REVISION, description = REVISION_DESC)})
public class PutDropbox extends AbstractProcessor implements DropboxTrait {
public static final int SINGLE_UPLOAD_LIMIT_IN_BYTES = 150 * 1024 * 1024;
public static final String IGNORE_RESOLUTION = "ignore";
public static final String OVERWRITE_RESOLUTION = "overwrite";
public static final String FAIL_RESOLUTION = "fail";
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Files that have been successfully written to Dropbox are transferred to this relationship.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Files that could not be written to Dropbox for some reason are transferred to this relationship.")
.build();
public static final PropertyDescriptor FOLDER = new PropertyDescriptor.Builder()
.name("folder")
.displayName("Folder")
.description("The path of the Dropbox folder to upload files to. "
+ "The folder will be created if it does not exist yet.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("/.*")))
.defaultValue("/")
.build();
public static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder()
.name("file-name")
.displayName("Filename")
.description("The full name of the file to upload.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("${filename}")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
.name("conflict-resolution-strategy")
.displayName("Conflict Resolution Strategy")
.description("Indicates what should happen when a file with the same name already exists in the specified Dropbox folder.")
.required(true)
.defaultValue(FAIL_RESOLUTION)
.allowableValues(FAIL_RESOLUTION, IGNORE_RESOLUTION, OVERWRITE_RESOLUTION)
.build();
public static final PropertyDescriptor CHUNKED_UPLOAD_SIZE = new PropertyDescriptor.Builder()
.name("chunked-upload-size")
.displayName("Chunked Upload Size")
.description("Defines the size of a chunk. Used when a FlowFile's size exceeds 'Chunked Upload Threshold' and content is uploaded in smaller chunks. "
+ "It is recommended to specify chunked upload size smaller than 'Chunked Upload Threshold' and as multiples of 4 MB. "
+ "Maximum allowed value is 150 MB.")
.defaultValue("8 MB")
.addValidator(StandardValidators.createDataSizeBoundsValidator(1, SINGLE_UPLOAD_LIMIT_IN_BYTES))
.required(false)
.build();
public static final PropertyDescriptor CHUNKED_UPLOAD_THRESHOLD = new PropertyDescriptor.Builder()
.name("chunked-upload-threshold")
.displayName("Chunked Upload Threshold")
.description("The maximum size of the content which is uploaded at once. FlowFiles larger than this threshold are uploaded in chunks. "
+ "Maximum allowed value is 150 MB.")
.defaultValue("150 MB")
.addValidator(StandardValidators.createDataSizeBoundsValidator(1, SINGLE_UPLOAD_LIMIT_IN_BYTES))
.required(false)
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
CREDENTIAL_SERVICE,
FOLDER,
FILE_NAME,
CONFLICT_RESOLUTION,
CHUNKED_UPLOAD_THRESHOLD,
CHUNKED_UPLOAD_SIZE,
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxySpec.HTTP_AUTH)
));
private static final Set<Relationship> RELATIONSHIPS;
static {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
RELATIONSHIPS = Collections.unmodifiableSet(rels);
}
private DbxClientV2 dropboxApiClient;
private DbxUploader<?, ?, ?> dbxUploader;
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
dropboxApiClient = getDropboxApiClient(context, getIdentifier());
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String folder = context.getProperty(FOLDER).evaluateAttributeExpressions(flowFile).getValue();
final String filename = context.getProperty(FILE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final long chunkUploadThreshold = context.getProperty(CHUNKED_UPLOAD_THRESHOLD)
.asDataSize(DataUnit.B)
.longValue();
final long uploadChunkSize = context.getProperty(CHUNKED_UPLOAD_SIZE)
.asDataSize(DataUnit.B)
.longValue();
final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
final long size = flowFile.getSize();
final String uploadPath = convertFolderName(folder) + "/" + filename;
final long startNanos = System.nanoTime();
FileMetadata fileMetadata = null;
try {
try (final InputStream rawIn = session.read(flowFile)) {
if (size <= chunkUploadThreshold) {
try (UploadUploader uploader = createUploadUploader(uploadPath, conflictResolution)) {
fileMetadata = uploader.uploadAndFinish(rawIn);
}
} else {
fileMetadata = uploadLargeFileInChunks(uploadPath, rawIn, size, uploadChunkSize, conflictResolution);
}
} catch (UploadErrorException e) {
handleUploadError(conflictResolution, uploadPath, e);
} catch (RateLimitException e) {
context.yield();
throw new ProcessException("Dropbox API rate limit exceeded while uploading file", e);
}
if (fileMetadata != null) {
final Map<String, String> attributes = createAttributeMap(fileMetadata);
String url = DROPBOX_HOME_URL + fileMetadata.getPathDisplay();
flowFile = session.putAllAttributes(flowFile, attributes);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, url, transferMillis);
}
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception e) {
getLogger().error("Exception occurred while uploading file '{}' to Dropbox folder '{}'", filename, folder, e);
flowFile = session.putAttribute(flowFile, ERROR_MESSAGE, e.getMessage());
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
@OnUnscheduled
public void shutdown() {
if (dbxUploader != null) {
dbxUploader.close();
}
}
private void handleUploadError(final String conflictResolution, final String uploadPath, final UploadErrorException e) throws UploadErrorException {
if (e.errorValue.isPath() && e.errorValue.getPathValue().getReason().isConflict()) {
if (IGNORE_RESOLUTION.equals(conflictResolution)) {
getLogger().info("File with the same name [{}] already exists. Remote file is not modified due to {} being set to '{}'.",
uploadPath, CONFLICT_RESOLUTION.getDisplayName(), conflictResolution);
return;
} else if (conflictResolution.equals(FAIL_RESOLUTION)) {
throw new ProcessException(format("File with the same name [%s] already exists.", uploadPath), e);
}
}
throw new ProcessException(e);
}
private FileMetadata uploadLargeFileInChunks(String path, InputStream rawIn, long size, long uploadChunkSize, String conflictResolution) throws DbxException, IOException {
final String sessionId;
try (UploadSessionStartUploader uploader = createUploadSessionStartUploader()) {
sessionId = uploader.uploadAndFinish(rawIn, uploadChunkSize).getSessionId();
}
long uploadedBytes = uploadChunkSize;
UploadSessionCursor cursor = new UploadSessionCursor(sessionId, uploadedBytes);
while (size - uploadedBytes > uploadChunkSize) {
try (UploadSessionAppendV2Uploader uploader = createUploadSessionAppendV2Uploader(cursor)) {
uploader.uploadAndFinish(rawIn, uploadChunkSize);
uploadedBytes += uploadChunkSize;
cursor = new UploadSessionCursor(sessionId, uploadedBytes);
}
}
final long remainingBytes = size - uploadedBytes;
final CommitInfo commitInfo = CommitInfo.newBuilder(path)
.withMode(getWriteMode(conflictResolution))
.withStrictConflict(true)
.withClientModified(new Date(System.currentTimeMillis()))
.build();
try (UploadSessionFinishUploader uploader = createUploadSessionFinishUploader(cursor, commitInfo)) {
return uploader.uploadAndFinish(rawIn, remainingBytes);
}
}
private WriteMode getWriteMode(String conflictResolution) {
if (OVERWRITE_RESOLUTION.equals(conflictResolution)) {
return WriteMode.OVERWRITE;
} else {
return WriteMode.ADD;
}
}
private UploadUploader createUploadUploader(String path, String conflictResolution) throws DbxException {
final UploadUploader uploadUploader = dropboxApiClient
.files()
.uploadBuilder(path)
.withMode(getWriteMode(conflictResolution))
.withStrictConflict(true)
.start();
dbxUploader = uploadUploader;
return uploadUploader;
}
private UploadSessionStartUploader createUploadSessionStartUploader() throws DbxException {
final UploadSessionStartUploader sessionStartUploader = dropboxApiClient
.files()
.uploadSessionStart();
dbxUploader = sessionStartUploader;
return sessionStartUploader;
}
private UploadSessionAppendV2Uploader createUploadSessionAppendV2Uploader(UploadSessionCursor cursor) throws DbxException {
final UploadSessionAppendV2Uploader sessionAppendV2Uploader = dropboxApiClient
.files()
.uploadSessionAppendV2(cursor);
dbxUploader = sessionAppendV2Uploader;
return sessionAppendV2Uploader;
}
private UploadSessionFinishUploader createUploadSessionFinishUploader(UploadSessionCursor cursor, CommitInfo commitInfo) throws DbxException {
final UploadSessionFinishUploader sessionFinishUploader = dropboxApiClient
.files()
.uploadSessionFinish(cursor, commitInfo);
dbxUploader = sessionFinishUploader;
return sessionFinishUploader;
}
}

View File

@ -14,3 +14,4 @@
# limitations under the License. # limitations under the License.
org.apache.nifi.processors.dropbox.ListDropbox org.apache.nifi.processors.dropbox.ListDropbox
org.apache.nifi.processors.dropbox.FetchDropbox org.apache.nifi.processors.dropbox.FetchDropbox
org.apache.nifi.processors.dropbox.PutDropbox

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.dropbox;
import static java.util.stream.Collectors.toSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
import com.dropbox.core.v2.DbxClientV2;
import com.dropbox.core.v2.files.FileMetadata;
import java.util.Collections;
import java.util.Date;
import java.util.Set;
import org.apache.nifi.dropbox.credentials.service.DropboxCredentialService;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.BeforeEach;
import org.mockito.Mock;
public class AbstractDropboxTest {
public static final String TEST_FOLDER = "/testFolder";
public static final String FILENAME_1 = "file_name_1";
public static final String FILENAME_2 = "file_name_2";
public static final String FILE_ID_1 = "id:odTlUvbpIEAAAAAAAAAGGQ";
public static final String FILE_ID_2 = "id:bdCQUvbpIEAABBAAAAAGKK";
public static final long CREATED_TIME = 1659707000;
public static final long SIZE = 125;
public static final String REVISION = "5e4ddb1320676a5c29261";
protected TestRunner testRunner;
@Mock
protected DbxClientV2 mockDropboxClient;
@Mock
private DropboxCredentialService mockCredentialService;
@BeforeEach
protected void setUp() throws Exception {
mockStandardDropboxCredentialService();
}
protected void assertProvenanceEvent(ProvenanceEventType eventType) {
Set<ProvenanceEventType> expectedEventTypes = Collections.singleton(eventType);
Set<ProvenanceEventType> actualEventTypes = testRunner.getProvenanceEvents().stream()
.map(ProvenanceEventRecord::getEventType)
.collect(toSet());
assertEquals(expectedEventTypes, actualEventTypes);
}
protected void assertNoProvenanceEvent() {
assertTrue(testRunner.getProvenanceEvents().isEmpty());
}
protected void mockStandardDropboxCredentialService() throws InitializationException {
String credentialServiceId = "dropbox_credentials";
when(mockCredentialService.getIdentifier()).thenReturn(credentialServiceId);
testRunner.addControllerService(credentialServiceId, mockCredentialService);
testRunner.enableControllerService(mockCredentialService);
testRunner.setProperty(FetchDropbox.CREDENTIAL_SERVICE, credentialServiceId);
}
protected FileMetadata createFileMetadata() {
return FileMetadata.newBuilder(FILENAME_1, FILE_ID_1,
new Date(CREATED_TIME),
new Date(CREATED_TIME),
REVISION, SIZE)
.withPathDisplay(getPath(TEST_FOLDER, FILENAME_1))
.withIsDownloadable(true)
.build();
}
protected FileMetadata createFileMetadata(
String id, String filename,
String parent,
long createdTime,
boolean isDownloadable) {
return FileMetadata.newBuilder(filename, id,
new Date(createdTime),
new Date(createdTime),
REVISION, SIZE)
.withPathDisplay(getPath(parent, filename))
.withIsDownloadable(isDownloadable)
.build();
}
protected FileMetadata createFileMetadata(String id,
String filename,
String parent,
long createdTime) {
return createFileMetadata(id, filename, parent, createdTime, true);
}
protected void assertOutFlowFileAttributes(MockFlowFile flowFile) {
assertOutFlowFileAttributes(flowFile, TEST_FOLDER);
}
protected void assertOutFlowFileAttributes(MockFlowFile flowFile, String folderName) {
flowFile.assertAttributeEquals(DropboxAttributes.ID, FILE_ID_1);
flowFile.assertAttributeEquals(DropboxAttributes.FILENAME, FILENAME_1);
flowFile.assertAttributeEquals(DropboxAttributes.PATH, folderName);
flowFile.assertAttributeEquals(DropboxAttributes.TIMESTAMP, Long.toString(CREATED_TIME));
flowFile.assertAttributeEquals(DropboxAttributes.SIZE, Long.toString(SIZE));
flowFile.assertAttributeEquals(DropboxAttributes.REVISION, REVISION);
}
protected String getPath(String folder, String filename) {
return "/".equals(folder) ? folder + filename : folder + "/" + filename;
}
}

View File

@ -16,7 +16,9 @@
*/ */
package org.apache.nifi.processors.dropbox; package org.apache.nifi.processors.dropbox;
import static java.lang.String.valueOf;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import com.dropbox.core.DbxDownloader; import com.dropbox.core.DbxDownloader;
@ -28,12 +30,9 @@ import java.io.ByteArrayInputStream;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.nifi.dropbox.credentials.service.DropboxCredentialService;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -42,36 +41,19 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class FetchDropboxTest { public class FetchDropboxTest extends AbstractDropboxTest {
public static final String FILE_ID_1 = "id:odTlUvbpIEAAAAAAAAAGGQ";
public static final String FILE_ID_2 = "id:odTlUvbpIEBBBBBBBBBGGQ";
public static final String FILENAME = "file_name";
public static final String FOLDER = "/testFolder";
public static final String SIZE = "125";
public static final String CREATED_TIME = "1659707000";
public static final String REVISION = "5e4ddb1320676a5c29261";
private TestRunner testRunner;
@Mock
private DbxClientV2 mockDropboxClient;
@Mock
private DropboxCredentialService credentialService;
@Mock @Mock
private DbxUserFilesRequests mockDbxUserFilesRequest; private DbxUserFilesRequests mockDbxUserFilesRequest;
@Mock @Mock
private DbxDownloader<FileMetadata> mockDbxDownloader; private DbxDownloader<FileMetadata> mockDbxDownloader;
@BeforeEach @BeforeEach
void setUp() throws Exception { public void setUp() throws Exception {
FetchDropbox testSubject = new FetchDropbox() { FetchDropbox testSubject = new FetchDropbox() {
@Override @Override
public DbxClientV2 getDropboxApiClient(ProcessContext context, ProxyConfiguration proxyConfiguration, String clientId) { public DbxClientV2 getDropboxApiClient(ProcessContext context, String id) {
return mockDropboxClient; return mockDropboxClient;
} }
}; };
@ -79,19 +61,18 @@ public class FetchDropboxTest {
testRunner = TestRunners.newTestRunner(testSubject); testRunner = TestRunners.newTestRunner(testSubject);
when(mockDropboxClient.files()).thenReturn(mockDbxUserFilesRequest); when(mockDropboxClient.files()).thenReturn(mockDbxUserFilesRequest);
super.setUp();
mockStandardDropboxCredentialService();
} }
@Test @Test
void testFileIsDownloadedById() throws Exception { void testFileIsDownloadedById() throws Exception {
testRunner.setProperty(FetchDropbox.FILE, "${dropbox.id}"); testRunner.setProperty(FetchDropbox.FILE, "${dropbox.id}");
when(mockDbxUserFilesRequest.download(FILE_ID_1)).thenReturn(mockDbxDownloader); when(mockDbxUserFilesRequest.download(FILE_ID_1)).thenReturn(mockDbxDownloader);
when(mockDbxDownloader.getInputStream()).thenReturn(new ByteArrayInputStream("content".getBytes(UTF_8))); when(mockDbxDownloader.getInputStream()).thenReturn(new ByteArrayInputStream("content".getBytes(UTF_8)));
when(mockDbxDownloader.getResult()).thenReturn(createFileMetadata());
MockFlowFile inputFlowFile = getMockFlowFile(FILE_ID_1); MockFlowFile inputFlowFile = getMockFlowFile();
testRunner.enqueue(inputFlowFile); testRunner.enqueue(inputFlowFile);
testRunner.run(); testRunner.run();
@ -99,17 +80,19 @@ public class FetchDropboxTest {
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(FetchDropbox.REL_SUCCESS); List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(FetchDropbox.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0); MockFlowFile ff0 = flowFiles.get(0);
ff0.assertContentEquals("content"); ff0.assertContentEquals("content");
assertOutFlowFileAttributes(ff0, FILE_ID_1); assertOutFlowFileAttributes(ff0);
assertProvenanceEvent(ProvenanceEventType.FETCH);
} }
@Test @Test
void testFileIsDownloadedByPath() throws Exception { void testFileIsDownloadedByPath() throws Exception {
testRunner.setProperty(FetchDropbox.FILE, "${path}/${filename}"); testRunner.setProperty(FetchDropbox.FILE, "${path}/${filename}");
when(mockDbxUserFilesRequest.download(FOLDER + "/" + FILENAME)).thenReturn(mockDbxDownloader); when(mockDbxUserFilesRequest.download(getPath(TEST_FOLDER, FILENAME_1))).thenReturn(mockDbxDownloader);
when(mockDbxDownloader.getInputStream()).thenReturn(new ByteArrayInputStream("contentByPath".getBytes(UTF_8))); when(mockDbxDownloader.getInputStream()).thenReturn(new ByteArrayInputStream("contentByPath".getBytes(UTF_8)));
when(mockDbxDownloader.getResult()).thenReturn(createFileMetadata());
MockFlowFile inputFlowFile = getMockFlowFile(FILE_ID_1); MockFlowFile inputFlowFile = getMockFlowFile();
testRunner.enqueue(inputFlowFile); testRunner.enqueue(inputFlowFile);
testRunner.run(); testRunner.run();
@ -117,53 +100,38 @@ public class FetchDropboxTest {
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(FetchDropbox.REL_SUCCESS); List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(FetchDropbox.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0); MockFlowFile ff0 = flowFiles.get(0);
ff0.assertContentEquals("contentByPath"); ff0.assertContentEquals("contentByPath");
assertOutFlowFileAttributes(ff0, FILE_ID_1); assertOutFlowFileAttributes(ff0);
assertProvenanceEvent(ProvenanceEventType.FETCH);
} }
@Test @Test
void testFetchFails() throws Exception { void testFetchFails() throws Exception {
testRunner.setProperty(FetchDropbox.FILE, "${dropbox.id}"); testRunner.setProperty(FetchDropbox.FILE, "${dropbox.id}");
when(mockDbxUserFilesRequest.download(FILE_ID_2)).thenThrow(new DbxException("Error in Dropbox")); when(mockDbxUserFilesRequest.download(FILE_ID_1)).thenThrow(new DbxException("Error in Dropbox"));
MockFlowFile inputFlowFile = getMockFlowFile(FILE_ID_2); MockFlowFile inputFlowFile = getMockFlowFile();
testRunner.enqueue(inputFlowFile); testRunner.enqueue(inputFlowFile);
testRunner.run(); testRunner.run();
testRunner.assertAllFlowFilesTransferred(FetchDropbox.REL_FAILURE, 1); testRunner.assertAllFlowFilesTransferred(FetchDropbox.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(FetchDropbox.REL_FAILURE); List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(FetchDropbox.REL_FAILURE);
MockFlowFile ff0 = flowFiles.get(0); MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals("error.message", "Error in Dropbox"); ff0.assertAttributeEquals(ERROR_MESSAGE, "Error in Dropbox");
assertOutFlowFileAttributes(ff0, FILE_ID_2); assertOutFlowFileAttributes(ff0);
assertNoProvenanceEvent();
} }
private void mockStandardDropboxCredentialService() throws InitializationException { private MockFlowFile getMockFlowFile() {
String credentialServiceId = "dropbox_credentials";
when(credentialService.getIdentifier()).thenReturn(credentialServiceId);
testRunner.addControllerService(credentialServiceId, credentialService);
testRunner.enableControllerService(credentialService);
testRunner.setProperty(FetchDropbox.CREDENTIAL_SERVICE, credentialServiceId);
}
private MockFlowFile getMockFlowFile(String fileId) {
MockFlowFile inputFlowFile = new MockFlowFile(0); MockFlowFile inputFlowFile = new MockFlowFile(0);
Map<String, String> attributes = new HashMap<>(); Map<String, String> attributes = new HashMap<>();
attributes.put(DropboxFileInfo.ID, fileId); attributes.put(DropboxAttributes.ID, FILE_ID_1);
attributes.put(DropboxFileInfo.REVISION, REVISION); attributes.put(DropboxAttributes.REVISION, REVISION);
attributes.put(DropboxFileInfo.FILENAME, FILENAME); attributes.put(DropboxAttributes.FILENAME, FILENAME_1);
attributes.put(DropboxFileInfo.PATH, FOLDER); attributes.put(DropboxAttributes.PATH, TEST_FOLDER);
attributes.put(DropboxFileInfo.SIZE, SIZE); attributes.put(DropboxAttributes.SIZE, valueOf(SIZE));
attributes.put(DropboxFileInfo.TIMESTAMP, CREATED_TIME); attributes.put(DropboxAttributes.TIMESTAMP, valueOf(CREATED_TIME));
inputFlowFile.putAttributes(attributes); inputFlowFile.putAttributes(attributes);
return inputFlowFile; return inputFlowFile;
} }
private void assertOutFlowFileAttributes(MockFlowFile flowFile, String fileId) {
flowFile.assertAttributeEquals(DropboxFileInfo.ID, fileId);
flowFile.assertAttributeEquals(DropboxFileInfo.REVISION, REVISION);
flowFile.assertAttributeEquals(DropboxFileInfo.PATH, FOLDER);
flowFile.assertAttributeEquals(DropboxFileInfo.SIZE, SIZE);
flowFile.assertAttributeEquals(DropboxFileInfo.TIMESTAMP, CREATED_TIME);
flowFile.assertAttributeEquals(DropboxFileInfo.FILENAME, FILENAME);
}
} }

View File

@ -26,7 +26,6 @@ import static org.mockito.Mockito.when;
import com.dropbox.core.DbxException; import com.dropbox.core.DbxException;
import com.dropbox.core.v2.DbxClientV2; import com.dropbox.core.v2.DbxClientV2;
import com.dropbox.core.v2.files.DbxUserFilesRequests; import com.dropbox.core.v2.files.DbxUserFilesRequests;
import com.dropbox.core.v2.files.FileMetadata;
import com.dropbox.core.v2.files.FolderMetadata; import com.dropbox.core.v2.files.FolderMetadata;
import com.dropbox.core.v2.files.ListFolderBuilder; import com.dropbox.core.v2.files.ListFolderBuilder;
import com.dropbox.core.v2.files.ListFolderResult; import com.dropbox.core.v2.files.ListFolderResult;
@ -37,18 +36,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Spliterator; import java.util.Spliterator;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
import org.apache.nifi.dropbox.credentials.service.DropboxCredentialService;
import org.apache.nifi.json.JsonRecordSetWriter; import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -57,27 +52,16 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class ListDropboxTest { public class ListDropboxTest extends AbstractDropboxTest {
public static final String ID_1 = "id:11111"; public static final String FOLDER_ID = "id:11111";
public static final String ID_2 = "id:22222";
public static final String TEST_FOLDER = "/testFolder";
public static final String FILENAME_1 = "file_name_1";
public static final String FILENAME_2 = "file_name_2";
public static final long SIZE = 125;
public static final long CREATED_TIME = 1659707000;
public static final String REVISION = "5e4ddb1320676a5c29261";
public static final boolean IS_RECURSIVE = true; public static final boolean IS_RECURSIVE = true;
public static final long MIN_TIMESTAMP = 1659707000; public static final long MIN_TIMESTAMP = 1659707000;
public static final long OLD_CREATED_TIME = 1657375066; public static final long OLD_CREATED_TIME = 1657375066;
private TestRunner testRunner;
@Mock @Mock
private DbxClientV2 mockDropboxClient; private DbxClientV2 mockDropboxClient;
@Mock
private DropboxCredentialService credentialService;
@Mock @Mock
private DbxUserFilesRequests mockDbxUserFilesRequest; private DbxUserFilesRequests mockDbxUserFilesRequest;
@ -88,10 +72,10 @@ public class ListDropboxTest {
private ListFolderBuilder mockListFolderBuilder; private ListFolderBuilder mockListFolderBuilder;
@BeforeEach @BeforeEach
void setUp() throws Exception { protected void setUp() throws Exception {
ListDropbox testSubject = new ListDropbox() { ListDropbox testSubject = new ListDropbox() {
@Override @Override
public DbxClientV2 getDropboxApiClient(ProcessContext context, ProxyConfiguration proxyConfiguration, String clientId) { public DbxClientV2 getDropboxApiClient(ProcessContext context, String id) {
return mockDropboxClient; return mockDropboxClient;
} }
@ -104,10 +88,9 @@ public class ListDropboxTest {
testRunner = TestRunners.newTestRunner(testSubject); testRunner = TestRunners.newTestRunner(testSubject);
mockStandardDropboxCredentialService();
testRunner.setProperty(ListDropbox.RECURSIVE_SEARCH, Boolean.toString(IS_RECURSIVE)); testRunner.setProperty(ListDropbox.RECURSIVE_SEARCH, Boolean.toString(IS_RECURSIVE));
testRunner.setProperty(ListDropbox.MIN_AGE, "0 sec"); testRunner.setProperty(ListDropbox.MIN_AGE, "0 sec");
super.setUp();
} }
@Test @Test
@ -140,7 +123,7 @@ public class ListDropboxTest {
//root is listed when "" is used in Dropbox API //root is listed when "" is used in Dropbox API
when(mockDbxUserFilesRequest.listFolderBuilder("")).thenReturn(mockListFolderBuilder); when(mockDbxUserFilesRequest.listFolderBuilder("")).thenReturn(mockListFolderBuilder);
when(mockListFolderResult.getEntries()).thenReturn(singletonList( when(mockListFolderResult.getEntries()).thenReturn(singletonList(
createFileMetadata(FILENAME_1, folderName, ID_1, CREATED_TIME) createFileMetadata(FILE_ID_1, FILENAME_1, folderName, CREATED_TIME)
)); ));
testRunner.run(); testRunner.run();
@ -148,7 +131,7 @@ public class ListDropboxTest {
testRunner.assertAllFlowFilesTransferred(ListDropbox.REL_SUCCESS, 1); testRunner.assertAllFlowFilesTransferred(ListDropbox.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ListDropbox.REL_SUCCESS); List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ListDropbox.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0); MockFlowFile ff0 = flowFiles.get(0);
assertFlowFileAttributes(ff0, folderName); assertOutFlowFileAttributes(ff0, folderName);
} }
@Test @Test
@ -159,9 +142,9 @@ public class ListDropboxTest {
when(mockDbxUserFilesRequest.listFolderBuilder(TEST_FOLDER)).thenReturn(mockListFolderBuilder); when(mockDbxUserFilesRequest.listFolderBuilder(TEST_FOLDER)).thenReturn(mockListFolderBuilder);
when(mockListFolderResult.getEntries()).thenReturn(Arrays.asList( when(mockListFolderResult.getEntries()).thenReturn(Arrays.asList(
createFileMetadata(FILENAME_1, TEST_FOLDER, ID_1, CREATED_TIME), createFileMetadata(FILE_ID_1, FILENAME_1, TEST_FOLDER, CREATED_TIME),
createFolderMetadata("testFolder1", TEST_FOLDER), createFolderMetadata(),
createFileMetadata(FILENAME_2, TEST_FOLDER, ID_2, CREATED_TIME, false) createFileMetadata(FILE_ID_2, FILENAME_2, TEST_FOLDER, CREATED_TIME, false)
)); ));
testRunner.run(); testRunner.run();
@ -169,7 +152,7 @@ public class ListDropboxTest {
testRunner.assertAllFlowFilesTransferred(ListDropbox.REL_SUCCESS, 1); testRunner.assertAllFlowFilesTransferred(ListDropbox.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ListDropbox.REL_SUCCESS); List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ListDropbox.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0); MockFlowFile ff0 = flowFiles.get(0);
assertFlowFileAttributes(ff0, TEST_FOLDER); assertOutFlowFileAttributes(ff0);
} }
@Test @Test
@ -180,8 +163,8 @@ public class ListDropboxTest {
when(mockDbxUserFilesRequest.listFolderBuilder(TEST_FOLDER)).thenReturn(mockListFolderBuilder); when(mockDbxUserFilesRequest.listFolderBuilder(TEST_FOLDER)).thenReturn(mockListFolderBuilder);
when(mockListFolderResult.getEntries()).thenReturn(Arrays.asList( when(mockListFolderResult.getEntries()).thenReturn(Arrays.asList(
createFileMetadata(FILENAME_1, TEST_FOLDER, ID_1, CREATED_TIME), createFileMetadata(FILE_ID_1, FILENAME_1, TEST_FOLDER, CREATED_TIME),
createFileMetadata(FILENAME_2, TEST_FOLDER, ID_2, OLD_CREATED_TIME) createFileMetadata(FILE_ID_2, FILENAME_2, TEST_FOLDER, OLD_CREATED_TIME)
)); ));
testRunner.run(); testRunner.run();
@ -189,7 +172,7 @@ public class ListDropboxTest {
testRunner.assertAllFlowFilesTransferred(ListDropbox.REL_SUCCESS, 1); testRunner.assertAllFlowFilesTransferred(ListDropbox.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ListDropbox.REL_SUCCESS); List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ListDropbox.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0); MockFlowFile ff0 = flowFiles.get(0);
assertFlowFileAttributes(ff0, TEST_FOLDER); assertOutFlowFileAttributes(ff0);
} }
@Test @Test
@ -201,8 +184,8 @@ public class ListDropboxTest {
when(mockDbxUserFilesRequest.listFolderBuilder(TEST_FOLDER)).thenReturn(mockListFolderBuilder); when(mockDbxUserFilesRequest.listFolderBuilder(TEST_FOLDER)).thenReturn(mockListFolderBuilder);
when(mockListFolderResult.getEntries()).thenReturn(Arrays.asList( when(mockListFolderResult.getEntries()).thenReturn(Arrays.asList(
createFileMetadata(FILENAME_1, TEST_FOLDER, ID_1, CREATED_TIME), createFileMetadata(FILE_ID_1, FILENAME_1, TEST_FOLDER, CREATED_TIME),
createFileMetadata(FILENAME_2, TEST_FOLDER, ID_2, CREATED_TIME) createFileMetadata(FILE_ID_2, FILENAME_2, TEST_FOLDER, CREATED_TIME)
)); ));
testRunner.run(); testRunner.run();
@ -216,52 +199,12 @@ public class ListDropboxTest {
assertEquals(expectedFileNames, actualFileNames); assertEquals(expectedFileNames, actualFileNames);
} }
private void assertFlowFileAttributes(MockFlowFile flowFile, String folderName) { private Metadata createFolderMetadata() {
flowFile.assertAttributeEquals(DropboxFileInfo.ID, ID_1); return FolderMetadata.newBuilder(FOLDER_ID)
flowFile.assertAttributeEquals(DropboxFileInfo.FILENAME, FILENAME_1); .withPathDisplay(TEST_FOLDER + "/" + FOLDER_ID)
flowFile.assertAttributeEquals(DropboxFileInfo.PATH, folderName);
flowFile.assertAttributeEquals(DropboxFileInfo.TIMESTAMP, Long.toString(CREATED_TIME));
flowFile.assertAttributeEquals(DropboxFileInfo.SIZE, Long.toString(SIZE));
flowFile.assertAttributeEquals(DropboxFileInfo.REVISION, REVISION);
}
private FileMetadata createFileMetadata(
String filename,
String parent,
String id,
long createdTime,
boolean isDownloadable) {
return FileMetadata.newBuilder(filename, id,
new Date(createdTime),
new Date(createdTime),
REVISION, SIZE)
.withPathDisplay(parent + "/" + filename)
.withIsDownloadable(isDownloadable)
.build(); .build();
} }
private FileMetadata createFileMetadata(
String filename,
String parent,
String id,
long createdTime) {
return createFileMetadata(filename, parent, id, createdTime, true);
}
private Metadata createFolderMetadata(String folderName, String parent) {
return FolderMetadata.newBuilder(folderName)
.withPathDisplay(parent + "/" + folderName)
.build();
}
private void mockStandardDropboxCredentialService() throws Exception {
String credentialServiceId = "dropbox_credentials";
when(credentialService.getIdentifier()).thenReturn(credentialServiceId);
testRunner.addControllerService(credentialServiceId, credentialService);
testRunner.enableControllerService(credentialService);
testRunner.setProperty(ListDropbox.CREDENTIAL_SERVICE, credentialServiceId);
}
private void mockRecordWriter() throws InitializationException { private void mockRecordWriter() throws InitializationException {
RecordSetWriterFactory recordWriter = new JsonRecordSetWriter(); RecordSetWriterFactory recordWriter = new JsonRecordSetWriter();
testRunner.addControllerService("record_writer", recordWriter); testRunner.addControllerService("record_writer", recordWriter);

View File

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.dropbox;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class PutDropboxIT extends AbstractDropboxIT<PutDropbox> {
private static final String CONTENT = "content";
private static final String CHANGED_CONTENT = "changedContent";
private static final String NON_EXISTING_FOLDER = "/doesNotExistYet";
@BeforeEach
public void init() throws Exception {
super.init();
testRunner.setProperty(PutDropbox.FILE_NAME, "testFile.json");
}
@AfterEach
public void teardown() throws Exception {
super.teardown();
deleteFolderIfExists(NON_EXISTING_FOLDER);
}
@Override
protected PutDropbox createTestSubject() {
return new PutDropbox();
}
@Test
void testUploadFileToExistingDirectory() {
testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER);
testRunner.enqueue(CONTENT);
testRunner.run();
testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
}
@Test
void testUploadFileCreateFolderWithSubFolders() {
testRunner.setProperty(PutDropbox.FOLDER, NON_EXISTING_FOLDER + "/subFolder1/subFolder2");
testRunner.enqueue(CONTENT);
testRunner.run();
testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
}
@Test
void testEmptyFileIsUpladed() {
testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER);
testRunner.enqueue("");
testRunner.run();
testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
}
@Test
void testUploadExistingFileFailStrategy() {
testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER);
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.FAIL_RESOLUTION);
testRunner.enqueue(CONTENT);
testRunner.run();
testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
testRunner.clearTransferState();
testRunner.enqueue(CHANGED_CONTENT);
testRunner.run();
testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 0);
testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 1);
}
@Test
void testUploadExistingFileWithSameContentFailStrategy() {
testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER);
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.FAIL_RESOLUTION);
testRunner.enqueue(CONTENT);
testRunner.run();
testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
testRunner.clearTransferState();
testRunner.enqueue(CONTENT);
testRunner.run();
testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 0);
testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 1);
}
@Test
void testUploadExistingFileOverwriteStrategy() {
testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER);
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.OVERWRITE_RESOLUTION);
testRunner.enqueue(CONTENT);
testRunner.run();
testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
testRunner.clearTransferState();
testRunner.enqueue(CHANGED_CONTENT);
testRunner.run();
testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
}
@Test
void testUploadExistingFileIgnoreStrategy() {
testRunner.setProperty(PutDropbox.FOLDER, MAIN_FOLDER);
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.IGNORE_RESOLUTION);
testRunner.enqueue(CONTENT);
testRunner.run();
testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
testRunner.clearTransferState();
testRunner.enqueue(CHANGED_CONTENT);
testRunner.run();
testRunner.assertTransferCount(PutDropbox.REL_SUCCESS, 1);
testRunner.assertTransferCount(PutDropbox.REL_FAILURE, 0);
}
}

View File

@ -0,0 +1,333 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.dropbox;
import static com.dropbox.core.v2.files.UploadError.path;
import static com.dropbox.core.v2.files.WriteConflictError.FILE;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.nifi.processors.dropbox.DropboxAttributes.ERROR_MESSAGE;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.dropbox.core.DbxException;
import com.dropbox.core.LocalizedText;
import com.dropbox.core.v2.DbxClientV2;
import com.dropbox.core.v2.files.CommitInfo;
import com.dropbox.core.v2.files.DbxUserFilesRequests;
import com.dropbox.core.v2.files.UploadErrorException;
import com.dropbox.core.v2.files.UploadSessionAppendV2Uploader;
import com.dropbox.core.v2.files.UploadSessionCursor;
import com.dropbox.core.v2.files.UploadSessionFinishUploader;
import com.dropbox.core.v2.files.UploadSessionStartResult;
import com.dropbox.core.v2.files.UploadSessionStartUploader;
import com.dropbox.core.v2.files.UploadUploader;
import com.dropbox.core.v2.files.UploadWriteFailed;
import com.dropbox.core.v2.files.WriteError;
import com.dropbox.core.v2.files.WriteMode;
import java.io.InputStream;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class PutDropboxTest extends AbstractDropboxTest {
public static final long CHUNKED_UPLOAD_SIZE_IN_BYTES = 8;
public static final long CHUNKED_UPLOAD_THRESHOLD_IN_BYTES = 15;
public static final String CONTENT = "1234567890";
public static final String LARGE_CONTENT_30B = "123456789012345678901234567890";
public static final String SESSION_ID = "sessionId";
@Mock(answer = RETURNS_DEEP_STUBS)
private DbxUserFilesRequests mockDbxUserFilesRequest;
@Mock
private UploadUploader mockUploadUploader;
@Mock
private UploadSessionStartUploader mockUploadSessionStartUploader;
@Mock
private UploadSessionStartResult mockUploadSessionStartResult;
@Mock
private UploadSessionAppendV2Uploader mockUploadSessionAppendV2Uploader;
@Mock
private UploadSessionFinishUploader mockUploadSessionFinishUploader;
@BeforeEach
protected void setUp() throws Exception {
final PutDropbox testSubject = new PutDropbox() {
@Override
public DbxClientV2 getDropboxApiClient(ProcessContext context, String id) {
return mockDropboxClient;
}
};
testRunner = TestRunners.newTestRunner(testSubject);
testRunner.setProperty(PutDropbox.FOLDER, TEST_FOLDER);
super.setUp();
}
@Test
void testFolderValidity() {
testRunner.setProperty(PutDropbox.FOLDER, "/");
testRunner.assertValid();
testRunner.setProperty(PutDropbox.FOLDER, "/tempFolder");
testRunner.assertValid();
}
@Test
void testUploadChunkSizeValidity() {
testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_SIZE, "");
testRunner.assertNotValid();
testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_SIZE, "40 MB");
testRunner.assertValid();
testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_SIZE, "152 MB");
testRunner.assertNotValid();
testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_SIZE, "1024");
testRunner.assertNotValid();
}
@Test
void testFileUploadFileNameFromProperty() throws Exception {
testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
mockFileUpload(TEST_FOLDER, FILENAME_1);
runWithFlowFile();
testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutDropbox.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
assertOutFlowFileAttributes(ff0);
assertProvenanceEvent(ProvenanceEventType.SEND);
}
@Test
void testFileUploadFileNameFromFlowFileAttribute() throws Exception {
mockFileUpload(TEST_FOLDER, FILENAME_2);
final MockFlowFile mockFlowFile = getMockFlowFile(CONTENT);
final Map<String, String> attributes = new HashMap<>();
attributes.put("filename", FILENAME_2);
mockFlowFile.putAttributes(attributes);
testRunner.enqueue(mockFlowFile);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1);
assertProvenanceEvent(ProvenanceEventType.SEND);
}
@Test
void testFileUploadFileToRoot() throws Exception {
testRunner.setProperty(PutDropbox.FOLDER, "/");
testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
mockFileUpload("/", FILENAME_1);
runWithFlowFile();
testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1);
assertProvenanceEvent(ProvenanceEventType.SEND);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutDropbox.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
assertOutFlowFileAttributes(ff0, "/");
}
@Test
void testFileUploadWithOverwriteConflictResolutionStrategy() throws Exception {
testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.OVERWRITE_RESOLUTION);
mockFileUpload(TEST_FOLDER, FILENAME_1, WriteMode.OVERWRITE);
runWithFlowFile();
testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1);
assertProvenanceEvent(ProvenanceEventType.SEND);
}
@Test
void testFileUploadError() throws Exception {
testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
mockFileUploadError(new DbxException("Dropbox error"));
runWithFlowFile();
testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_FAILURE, 1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutDropbox.REL_FAILURE);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals(ERROR_MESSAGE, "Dropbox error");
assertNoProvenanceEvent();
}
@Test
void testFileUploadOtherExceptionIsNotIgnored() throws Exception {
testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.IGNORE_RESOLUTION);
mockFileUploadError(getException(WriteError.INSUFFICIENT_SPACE));
runWithFlowFile();
testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_FAILURE, 1);
assertNoProvenanceEvent();
}
@Test
void testFileUploadConflictIgnoredWithIgnoreResolutionStrategy() throws Exception {
testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
testRunner.setProperty(PutDropbox.CONFLICT_RESOLUTION, PutDropbox.IGNORE_RESOLUTION);
mockFileUploadError(getException(WriteError.conflict(FILE)));
runWithFlowFile();
testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1);
assertNoProvenanceEvent();
}
@Test
void testFileUploadConflictNotIgnoredWithDefaultFailStrategy() throws Exception {
testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
mockFileUploadError(getException(WriteError.conflict(FILE)));
runWithFlowFile();
testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_FAILURE, 1);
assertNoProvenanceEvent();
}
@Test
void testFileUploadLargeFile() throws Exception {
MockFlowFile mockFlowFile = getMockFlowFile(LARGE_CONTENT_30B);
testRunner.setProperty(PutDropbox.FILE_NAME, FILENAME_1);
testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_SIZE, CHUNKED_UPLOAD_SIZE_IN_BYTES + " B");
testRunner.setProperty(PutDropbox.CHUNKED_UPLOAD_THRESHOLD, CHUNKED_UPLOAD_THRESHOLD_IN_BYTES + " B");
when(mockDropboxClient.files())
.thenReturn(mockDbxUserFilesRequest);
//start session: 8 bytes uploaded
when(mockDbxUserFilesRequest
.uploadSessionStart())
.thenReturn(mockUploadSessionStartUploader);
when(mockUploadSessionStartUploader
.uploadAndFinish(any(InputStream.class), eq(CHUNKED_UPLOAD_SIZE_IN_BYTES)))
.thenReturn(mockUploadSessionStartResult);
when(mockUploadSessionStartResult
.getSessionId())
.thenReturn(SESSION_ID);
//append session: invoked twice, 2 * 8 bytes uploaded
when(mockDbxUserFilesRequest
.uploadSessionAppendV2(any(UploadSessionCursor.class)))
.thenReturn(mockUploadSessionAppendV2Uploader);
//finish session: 30 - 8 - 2 * 8 = 6 bytes uploaded
CommitInfo commitInfo = CommitInfo.newBuilder(getPath(TEST_FOLDER , FILENAME_1))
.withMode(WriteMode.ADD)
.withStrictConflict(true)
.withClientModified(new Date(mockFlowFile.getEntryDate()))
.build();
when(mockDbxUserFilesRequest
.uploadSessionFinish(any(UploadSessionCursor.class), eq(commitInfo)))
.thenReturn(mockUploadSessionFinishUploader);
when(mockUploadSessionFinishUploader
.uploadAndFinish(any(InputStream.class), eq(6L)))
.thenReturn(createFileMetadata(FILE_ID_1, FILENAME_1, TEST_FOLDER, CREATED_TIME));
testRunner.enqueue(mockFlowFile);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutDropbox.REL_SUCCESS, 1);
verify(mockUploadSessionAppendV2Uploader, times(2))
.uploadAndFinish(any(InputStream.class), eq(CHUNKED_UPLOAD_SIZE_IN_BYTES));
assertProvenanceEvent(ProvenanceEventType.SEND);
}
private void mockFileUpload(String folder, String filename) throws Exception {
mockFileUpload(folder, filename, WriteMode.ADD);
}
private void mockFileUpload(String folder, String filename, WriteMode writeMode) throws Exception {
when(mockDropboxClient.files())
.thenReturn(mockDbxUserFilesRequest);
when(mockDbxUserFilesRequest
.uploadBuilder(getPath(folder, filename))
.withMode(writeMode)
.withStrictConflict(true)
.start())
.thenReturn(mockUploadUploader);
when(mockUploadUploader
.uploadAndFinish(any(InputStream.class)))
.thenReturn(createFileMetadata(FILE_ID_1, filename, folder, CREATED_TIME));
}
private void mockFileUploadError(DbxException exception) throws Exception {
when(mockDropboxClient.files())
.thenReturn(mockDbxUserFilesRequest);
when(mockDbxUserFilesRequest
.uploadBuilder(getPath(TEST_FOLDER, FILENAME_1))
.withMode(WriteMode.ADD)
.withStrictConflict(true)
.start())
.thenReturn(mockUploadUploader);
when(mockUploadUploader
.uploadAndFinish(any(InputStream.class)))
.thenThrow(exception);
}
private UploadErrorException getException(WriteError writeErrorReason) {
return new UploadErrorException("route", "requestId", new LocalizedText("upload error", "en-us"),
path(new UploadWriteFailed(writeErrorReason, "uploadSessionId")));
}
private MockFlowFile getMockFlowFile(String content) {
MockFlowFile inputFlowFile = new MockFlowFile(0);
inputFlowFile.setData(content.getBytes(UTF_8));
return inputFlowFile;
}
private void runWithFlowFile() {
MockFlowFile mockFlowFile = getMockFlowFile(CONTENT);
testRunner.enqueue(mockFlowFile);
testRunner.run();
}
}