mirror of https://github.com/apache/nifi.git
NIFI-10444 FetchDropbox processor
NIFI-10444 Add ProxyConfiguration to ListDropbox and FetchDropbox processors This closes #6401. Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
parent
ec09c56e93
commit
111c7ac0a4
|
@ -39,6 +39,10 @@
|
|||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-proxy-configuration-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-listed-entity</artifactId>
|
||||
|
@ -55,6 +59,10 @@
|
|||
<artifactId>dropbox-core-sdk</artifactId>
|
||||
<version>${dropbox.client.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.squareup.okhttp3</groupId>
|
||||
<artifactId>okhttp</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.dropbox;
|
||||
|
||||
import com.dropbox.core.DbxRequestConfig;
|
||||
import com.dropbox.core.http.HttpRequestor;
|
||||
import com.dropbox.core.http.OkHttp3Requestor;
|
||||
import com.dropbox.core.oauth.DbxCredential;
|
||||
import com.dropbox.core.v2.DbxClientV2;
|
||||
import java.net.Proxy;
|
||||
import okhttp3.Credentials;
|
||||
import okhttp3.OkHttpClient;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.dropbox.credentials.service.DropboxCredentialDetails;
|
||||
import org.apache.nifi.dropbox.credentials.service.DropboxCredentialService;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||
|
||||
public interface DropboxTrait {
|
||||
|
||||
PropertyDescriptor CREDENTIAL_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("dropbox-credential-service")
|
||||
.displayName("Dropbox Credential Service")
|
||||
.description("Controller Service used to obtain Dropbox credentials (App Key, App Secret, Access Token, Refresh Token)." +
|
||||
" See controller service's Additional Details for more information.")
|
||||
.identifiesControllerService(DropboxCredentialService.class)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
|
||||
default DbxClientV2 getDropboxApiClient(ProcessContext context, ProxyConfiguration proxyConfiguration, String clientId) {
|
||||
OkHttpClient.Builder okHttpClientBuilder = OkHttp3Requestor.defaultOkHttpClientBuilder();
|
||||
|
||||
if (!Proxy.Type.DIRECT.equals(proxyConfiguration.getProxyType())) {
|
||||
okHttpClientBuilder.proxy(proxyConfiguration.createProxy());
|
||||
|
||||
if (proxyConfiguration.hasCredential()) {
|
||||
okHttpClientBuilder.proxyAuthenticator((route, response) -> {
|
||||
final String credential = Credentials.basic(proxyConfiguration.getProxyUserName(), proxyConfiguration.getProxyUserPassword());
|
||||
return response.request().newBuilder()
|
||||
.header("Proxy-Authorization", credential)
|
||||
.build();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
HttpRequestor httpRequestor = new OkHttp3Requestor(okHttpClientBuilder.build());
|
||||
DbxRequestConfig config = DbxRequestConfig.newBuilder(clientId)
|
||||
.withHttpRequestor(httpRequestor)
|
||||
.build();
|
||||
|
||||
final DropboxCredentialService credentialService = context.getProperty(CREDENTIAL_SERVICE)
|
||||
.asControllerService(DropboxCredentialService.class);
|
||||
DropboxCredentialDetails credential = credentialService.getDropboxCredential();
|
||||
|
||||
return new DbxClientV2(config, new DbxCredential(credential.getAccessToken(), -1L,
|
||||
credential.getRefreshToken(), credential.getAppKey(), credential.getAppSecret()));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,151 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.dropbox;
|
||||
|
||||
import static java.lang.String.format;
|
||||
|
||||
import com.dropbox.core.DbxException;
|
||||
import com.dropbox.core.v2.DbxClientV2;
|
||||
import java.io.InputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||
import org.apache.nifi.proxy.ProxySpec;
|
||||
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
@Tags({"dropbox", "storage", "fetch"})
|
||||
@CapabilityDescription("Fetches files from Dropbox. Designed to be used in tandem with ListDropbox.")
|
||||
@WritesAttribute(attribute = "error.message", description = "When a FlowFile is routed to 'failure', this attribute is added indicating why the file could "
|
||||
+ "not be fetched from Dropbox.")
|
||||
@SeeAlso(ListDropbox.class)
|
||||
@WritesAttributes(
|
||||
@WritesAttribute(attribute = FetchDropbox.ERROR_MESSAGE_ATTRIBUTE, description = "The error message returned by Dropbox when the fetch of a file fails."))
|
||||
public class FetchDropbox extends AbstractProcessor implements DropboxTrait {
|
||||
|
||||
public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
|
||||
|
||||
public static final PropertyDescriptor FILE = new PropertyDescriptor
|
||||
.Builder().name("file")
|
||||
.displayName("File")
|
||||
.description("The Dropbox identifier or path of the Dropbox file to fetch." +
|
||||
" The 'File' should match the following regular expression pattern: /.*|id:.* ." +
|
||||
" When ListDropbox is used for input, either '${dropbox.id}' (identifying files by Dropbox id)" +
|
||||
" or '${path}/${filename}' (identifying files by path) can be used as 'File' value.")
|
||||
.required(true)
|
||||
.defaultValue("${dropbox.id}")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.addValidator(StandardValidators.createRegexMatchingValidator(
|
||||
Pattern.compile("/.*|id:.*")))
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SUCCESS =
|
||||
new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("A FlowFile will be routed here for each successfully fetched File.")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_FAILURE =
|
||||
new Relationship.Builder().name("failure")
|
||||
.description("A FlowFile will be routed here for each File for which fetch was attempted but failed.")
|
||||
.build();
|
||||
|
||||
public static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
|
||||
REL_SUCCESS,
|
||||
REL_FAILURE
|
||||
)));
|
||||
|
||||
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
|
||||
CREDENTIAL_SERVICE,
|
||||
FILE,
|
||||
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxySpec.HTTP_AUTH)
|
||||
));
|
||||
|
||||
private DbxClientV2 dropboxApiClient;
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return relationships;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return PROPERTIES;
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context);
|
||||
String dropboxClientId = format("%s-%s", getClass().getSimpleName(), getIdentifier());
|
||||
dropboxApiClient = getDropboxApiClient(context, proxyConfiguration, dropboxClientId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
String fileIdentifier = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
|
||||
fileIdentifier = correctFilePath(fileIdentifier);
|
||||
|
||||
FlowFile outFlowFile = flowFile;
|
||||
try {
|
||||
fetchFile(fileIdentifier, session, outFlowFile);
|
||||
session.transfer(outFlowFile, REL_SUCCESS);
|
||||
} catch (Exception e) {
|
||||
handleError(session, flowFile, fileIdentifier, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void fetchFile(String fileId, ProcessSession session, FlowFile outFlowFile) throws DbxException {
|
||||
InputStream dropboxInputStream = dropboxApiClient.files()
|
||||
.download(fileId)
|
||||
.getInputStream();
|
||||
session.importFrom(dropboxInputStream, outFlowFile);
|
||||
}
|
||||
|
||||
private void handleError(ProcessSession session, FlowFile flowFile, String fileId, Exception e) {
|
||||
getLogger().error("Error while fetching and processing file with id '{}'", fileId, e);
|
||||
FlowFile outFlowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE, e.getMessage());
|
||||
session.transfer(outFlowFile, REL_FAILURE);
|
||||
}
|
||||
|
||||
private String correctFilePath(String folderName) {
|
||||
return folderName.startsWith("//") ? folderName.replaceFirst("//", "/") : folderName;
|
||||
}
|
||||
}
|
|
@ -20,8 +20,6 @@ import static java.lang.String.format;
|
|||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
import com.dropbox.core.DbxException;
|
||||
import com.dropbox.core.DbxRequestConfig;
|
||||
import com.dropbox.core.oauth.DbxCredential;
|
||||
import com.dropbox.core.v2.DbxClientV2;
|
||||
import com.dropbox.core.v2.files.FileMetadata;
|
||||
import com.dropbox.core.v2.files.ListFolderBuilder;
|
||||
|
@ -45,25 +43,26 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
|
|||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.state.Scope;
|
||||
import org.apache.nifi.context.PropertyContext;
|
||||
import org.apache.nifi.dropbox.credentials.service.DropboxCredentialDetails;
|
||||
import org.apache.nifi.dropbox.credentials.service.DropboxCredentialService;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processor.util.list.AbstractListProcessor;
|
||||
import org.apache.nifi.processor.util.list.ListedEntityTracker;
|
||||
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||
import org.apache.nifi.proxy.ProxySpec;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
@PrimaryNodeOnly
|
||||
@TriggerSerially
|
||||
@Tags({"dropbox", "storage"})
|
||||
@CapabilityDescription("Retrieves a listing of files from Dropbox (shortcuts are ignored)." +
|
||||
" Each listed file may result in one FlowFile, the metadata being written as Flowfile attributes." +
|
||||
" Each listed file may result in one FlowFile, the metadata being written as FlowFile attributes." +
|
||||
" When the 'Record Writer' property is set, the entire result is written as records to a single FlowFile." +
|
||||
" This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the" +
|
||||
" previous node left off without duplicating all of the data.")
|
||||
|
@ -76,8 +75,8 @@ import org.apache.nifi.serialization.record.RecordSchema;
|
|||
@WritesAttribute(attribute = DropboxFileInfo.REVISION, description = "Revision of the file")})
|
||||
@Stateful(scopes = {Scope.CLUSTER}, description = "The processor stores necessary data to be able to keep track what files have been listed already. " +
|
||||
"What exactly needs to be stored depends on the 'Listing Strategy'.")
|
||||
public class ListDropbox extends AbstractListProcessor<DropboxFileInfo> {
|
||||
|
||||
@SeeAlso(FetchDropbox.class)
|
||||
public class ListDropbox extends AbstractListProcessor<DropboxFileInfo> implements DropboxTrait {
|
||||
public static final PropertyDescriptor FOLDER = new PropertyDescriptor.Builder()
|
||||
.name("folder")
|
||||
.displayName("Folder")
|
||||
|
@ -109,15 +108,6 @@ public class ListDropbox extends AbstractListProcessor<DropboxFileInfo> {
|
|||
.defaultValue("0 sec")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CREDENTIAL_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("dropbox-credential-service")
|
||||
.displayName("Dropbox Credential Service")
|
||||
.description("Controller Service used to obtain Dropbox credentials (App Key, App Secret, Access Token, Refresh Token)." +
|
||||
" See controller service's Additional Details for more information.")
|
||||
.identifiesControllerService(DropboxCredentialService.class)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(AbstractListProcessor.LISTING_STRATEGY)
|
||||
.allowableValues(BY_TIMESTAMPS, BY_ENTITIES, BY_TIME_WINDOW, NO_TRACKING)
|
||||
|
@ -147,14 +137,17 @@ public class ListDropbox extends AbstractListProcessor<DropboxFileInfo> {
|
|||
TRACKING_STATE_CACHE,
|
||||
TRACKING_TIME_WINDOW,
|
||||
INITIAL_LISTING_TARGET,
|
||||
RECORD_WRITER
|
||||
RECORD_WRITER,
|
||||
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxySpec.HTTP_AUTH)
|
||||
));
|
||||
|
||||
private DbxClientV2 dropboxApiClient;
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
dropboxApiClient = getDropboxApiClient(context);
|
||||
final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context);
|
||||
String dropboxClientId = format("%s-%s", getClass().getSimpleName(), getIdentifier());
|
||||
dropboxApiClient = getDropboxApiClient(context, proxyConfiguration, dropboxClientId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -181,15 +174,6 @@ public class ListDropbox extends AbstractListProcessor<DropboxFileInfo> {
|
|||
return context.getProperty(FOLDER).evaluateAttributeExpressions().getValue();
|
||||
}
|
||||
|
||||
protected DbxClientV2 getDropboxApiClient(ProcessContext context) {
|
||||
final DropboxCredentialService credentialService = context.getProperty(CREDENTIAL_SERVICE)
|
||||
.asControllerService(DropboxCredentialService.class);
|
||||
DbxRequestConfig config = new DbxRequestConfig(format("%s-%s", getClass().getSimpleName(), getIdentifier()));
|
||||
DropboxCredentialDetails credential = credentialService.getDropboxCredential();
|
||||
return new DbxClientV2(config, new DbxCredential(credential.getAccessToken(), -1L,
|
||||
credential.getRefreshToken(), credential.getAppKey(), credential.getAppSecret()));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<DropboxFileInfo> performListing(ProcessContext context, Long minTimestamp,
|
||||
ListingMode listingMode) throws IOException {
|
||||
|
|
|
@ -13,3 +13,4 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.processors.dropbox.ListDropbox
|
||||
org.apache.nifi.processors.dropbox.FetchDropbox
|
||||
|
|
|
@ -99,9 +99,9 @@ public abstract class AbstractDropboxIT<T extends Processor> {
|
|||
}
|
||||
}
|
||||
|
||||
protected FileMetadata createFile(String name, String fileContent, String folder) throws Exception {
|
||||
protected FileMetadata createFile(String folder, String filename, String fileContent) throws Exception {
|
||||
ByteArrayInputStream content = new ByteArrayInputStream(fileContent.getBytes(StandardCharsets.UTF_8));
|
||||
return client.files().upload(folder + "/" + name).uploadAndFinish(content);
|
||||
return client.files().upload(folder.equals("/") ? "/" + filename : folder + "/" + filename).uploadAndFinish(content);
|
||||
}
|
||||
|
||||
private String createFolder(String path) throws Exception {
|
||||
|
|
|
@ -0,0 +1,132 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.dropbox;
|
||||
|
||||
import com.dropbox.core.v2.files.FileMetadata;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class FetchDropboxIT extends AbstractDropboxIT<FetchDropbox> {
|
||||
|
||||
private static final String DEFAULT_FILE_CONTENT = "test_file_content1";
|
||||
|
||||
@BeforeEach
|
||||
public void init() throws Exception {
|
||||
super.init();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected FetchDropbox createTestSubject() {
|
||||
return new FetchDropbox();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFetchFileById() throws Exception {
|
||||
FileMetadata file = createFile("/testFolder", "test_file1", DEFAULT_FILE_CONTENT);
|
||||
|
||||
Map<String, String> inputFlowFileAttributes = new HashMap<>();
|
||||
inputFlowFileAttributes.put("dropbox.id", file.getId());
|
||||
|
||||
testRunner.enqueue("unimportant_data", inputFlowFileAttributes);
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertTransferCount(FetchDropbox.REL_FAILURE, 0);
|
||||
testRunner.assertTransferCount(FetchDropbox.REL_SUCCESS, 1);
|
||||
|
||||
MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchDropbox.REL_SUCCESS).get(0);
|
||||
flowFile.assertContentEquals(DEFAULT_FILE_CONTENT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFetchFileByPath() throws Exception {
|
||||
createFile("/testFolder", "test_file1", DEFAULT_FILE_CONTENT);
|
||||
|
||||
Map<String, String> inputFlowFileAttributes = new HashMap<>();
|
||||
inputFlowFileAttributes.put("path", "/testFolder");
|
||||
inputFlowFileAttributes.put("filename", "test_file1");
|
||||
|
||||
testRunner.setProperty(FetchDropbox.FILE, "${path}/${filename}");
|
||||
|
||||
testRunner.enqueue("unimportant_data", inputFlowFileAttributes);
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertTransferCount(FetchDropbox.REL_FAILURE, 0);
|
||||
testRunner.assertTransferCount(FetchDropbox.REL_SUCCESS, 1);
|
||||
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(FetchDropbox.REL_SUCCESS);
|
||||
MockFlowFile ff0 = flowFiles.get(0);
|
||||
ff0.assertContentEquals(DEFAULT_FILE_CONTENT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFetchFileFromRootByPath() throws Exception {
|
||||
createFile("/", "test_file1", DEFAULT_FILE_CONTENT);
|
||||
|
||||
Map<String, String> inputFlowFileAttributes = new HashMap<>();
|
||||
inputFlowFileAttributes.put("path", "/");
|
||||
inputFlowFileAttributes.put("filename", "test_file1");
|
||||
|
||||
testRunner.setProperty(FetchDropbox.FILE, "${path}/${filename}");
|
||||
|
||||
testRunner.enqueue("unimportant_data", inputFlowFileAttributes);
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertTransferCount(FetchDropbox.REL_FAILURE, 0);
|
||||
testRunner.assertTransferCount(FetchDropbox.REL_SUCCESS, 1);
|
||||
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(FetchDropbox.REL_SUCCESS);
|
||||
MockFlowFile ff0 = flowFiles.get(0);
|
||||
ff0.assertContentEquals(DEFAULT_FILE_CONTENT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFetchingFolderFails() {
|
||||
Map<String, String> inputFlowFileAttributes = new HashMap<>();
|
||||
inputFlowFileAttributes.put("path", "/testFolder");
|
||||
|
||||
testRunner.setProperty(FetchDropbox.FILE, "${path}");
|
||||
|
||||
testRunner.enqueue("unimportant_data", inputFlowFileAttributes);
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertTransferCount(FetchDropbox.REL_FAILURE, 1);
|
||||
testRunner.assertTransferCount(FetchDropbox.REL_SUCCESS, 0);
|
||||
|
||||
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(FetchDropbox.REL_FAILURE);
|
||||
MockFlowFile ff0 = flowFiles.get(0);
|
||||
ff0.assertAttributeEquals("error.message", "Exception in 2/files/download: {\".tag\":\"path\",\"path\":\"not_file\"}");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFetchingFileByWrongIdFails() {
|
||||
Map<String, String> inputFlowFileAttributes = new HashMap<>();
|
||||
inputFlowFileAttributes.put("dropbox.id", "id:missing");
|
||||
|
||||
testRunner.enqueue("unimportant_data", inputFlowFileAttributes);
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertTransferCount(FetchDropbox.REL_SUCCESS, 0);
|
||||
testRunner.assertTransferCount(FetchDropbox.REL_FAILURE, 1);
|
||||
|
||||
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(FetchDropbox.REL_FAILURE);
|
||||
MockFlowFile ff0 = flowFiles.get(0);
|
||||
ff0.assertAttributeEquals("error.message", "Exception in 2/files/download: {\".tag\":\"path\",\"path\":\"not_found\"}");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,169 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.dropbox;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.dropbox.core.DbxDownloader;
|
||||
import com.dropbox.core.DbxException;
|
||||
import com.dropbox.core.v2.DbxClientV2;
|
||||
import com.dropbox.core.v2.files.DbxUserFilesRequests;
|
||||
import com.dropbox.core.v2.files.FileMetadata;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.apache.nifi.dropbox.credentials.service.DropboxCredentialService;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class FetchDropboxTest {
|
||||
|
||||
public static final String FILE_ID_1 = "id:odTlUvbpIEAAAAAAAAAGGQ";
|
||||
public static final String FILE_ID_2 = "id:odTlUvbpIEBBBBBBBBBGGQ";
|
||||
public static final String FILENAME = "file_name";
|
||||
public static final String FOLDER = "/testFolder";
|
||||
public static final String SIZE = "125";
|
||||
public static final String CREATED_TIME = "1659707000";
|
||||
public static final String REVISION = "5e4ddb1320676a5c29261";
|
||||
|
||||
private TestRunner testRunner;
|
||||
|
||||
@Mock
|
||||
private DbxClientV2 mockDropboxClient;
|
||||
|
||||
@Mock
|
||||
private DropboxCredentialService credentialService;
|
||||
|
||||
@Mock
|
||||
private DbxUserFilesRequests mockDbxUserFilesRequest;
|
||||
|
||||
|
||||
@Mock
|
||||
private DbxDownloader<FileMetadata> mockDbxDownloader;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() throws Exception {
|
||||
FetchDropbox testSubject = new FetchDropbox() {
|
||||
@Override
|
||||
public DbxClientV2 getDropboxApiClient(ProcessContext context, ProxyConfiguration proxyConfiguration, String clientId) {
|
||||
return mockDropboxClient;
|
||||
}
|
||||
};
|
||||
|
||||
testRunner = TestRunners.newTestRunner(testSubject);
|
||||
|
||||
when(mockDropboxClient.files()).thenReturn(mockDbxUserFilesRequest);
|
||||
|
||||
mockStandardDropboxCredentialService();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFileIsDownloadedById() throws Exception {
|
||||
|
||||
testRunner.setProperty(FetchDropbox.FILE, "${dropbox.id}");
|
||||
|
||||
when(mockDbxUserFilesRequest.download(FILE_ID_1)).thenReturn(mockDbxDownloader);
|
||||
when(mockDbxDownloader.getInputStream()).thenReturn(new ByteArrayInputStream("content".getBytes(UTF_8)));
|
||||
|
||||
MockFlowFile inputFlowFile = getMockFlowFile(FILE_ID_1);
|
||||
testRunner.enqueue(inputFlowFile);
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(FetchDropbox.REL_SUCCESS, 1);
|
||||
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(FetchDropbox.REL_SUCCESS);
|
||||
MockFlowFile ff0 = flowFiles.get(0);
|
||||
ff0.assertContentEquals("content");
|
||||
assertOutFlowFileAttributes(ff0, FILE_ID_1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFileIsDownloadedByPath() throws Exception {
|
||||
testRunner.setProperty(FetchDropbox.FILE, "${path}/${filename}");
|
||||
|
||||
when(mockDbxUserFilesRequest.download(FOLDER + "/" + FILENAME)).thenReturn(mockDbxDownloader);
|
||||
when(mockDbxDownloader.getInputStream()).thenReturn(new ByteArrayInputStream("contentByPath".getBytes(UTF_8)));
|
||||
|
||||
MockFlowFile inputFlowFile = getMockFlowFile(FILE_ID_1);
|
||||
testRunner.enqueue(inputFlowFile);
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(FetchDropbox.REL_SUCCESS, 1);
|
||||
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(FetchDropbox.REL_SUCCESS);
|
||||
MockFlowFile ff0 = flowFiles.get(0);
|
||||
ff0.assertContentEquals("contentByPath");
|
||||
assertOutFlowFileAttributes(ff0, FILE_ID_1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFetchFails() throws Exception {
|
||||
testRunner.setProperty(FetchDropbox.FILE, "${dropbox.id}");
|
||||
|
||||
when(mockDbxUserFilesRequest.download(FILE_ID_2)).thenThrow(new DbxException("Error in Dropbox"));
|
||||
|
||||
MockFlowFile inputFlowFile = getMockFlowFile(FILE_ID_2);
|
||||
testRunner.enqueue(inputFlowFile);
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(FetchDropbox.REL_FAILURE, 1);
|
||||
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(FetchDropbox.REL_FAILURE);
|
||||
MockFlowFile ff0 = flowFiles.get(0);
|
||||
ff0.assertAttributeEquals("error.message", "Error in Dropbox");
|
||||
assertOutFlowFileAttributes(ff0, FILE_ID_2);
|
||||
}
|
||||
|
||||
private void mockStandardDropboxCredentialService() throws InitializationException {
|
||||
String credentialServiceId = "dropbox_credentials";
|
||||
when(credentialService.getIdentifier()).thenReturn(credentialServiceId);
|
||||
testRunner.addControllerService(credentialServiceId, credentialService);
|
||||
testRunner.enableControllerService(credentialService);
|
||||
testRunner.setProperty(FetchDropbox.CREDENTIAL_SERVICE, credentialServiceId);
|
||||
}
|
||||
|
||||
private MockFlowFile getMockFlowFile(String fileId) {
|
||||
MockFlowFile inputFlowFile = new MockFlowFile(0);
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put(DropboxFileInfo.ID, fileId);
|
||||
attributes.put(DropboxFileInfo.REVISION, REVISION);
|
||||
attributes.put(DropboxFileInfo.FILENAME, FILENAME);
|
||||
attributes.put(DropboxFileInfo.PATH, FOLDER);
|
||||
attributes.put(DropboxFileInfo.SIZE, SIZE);
|
||||
attributes.put(DropboxFileInfo.TIMESTAMP, CREATED_TIME);
|
||||
inputFlowFile.putAttributes(attributes);
|
||||
return inputFlowFile;
|
||||
}
|
||||
|
||||
private void assertOutFlowFileAttributes(MockFlowFile flowFile, String fileId) {
|
||||
flowFile.assertAttributeEquals(DropboxFileInfo.ID, fileId);
|
||||
flowFile.assertAttributeEquals(DropboxFileInfo.REVISION, REVISION);
|
||||
flowFile.assertAttributeEquals(DropboxFileInfo.PATH, FOLDER);
|
||||
flowFile.assertAttributeEquals(DropboxFileInfo.SIZE, SIZE);
|
||||
flowFile.assertAttributeEquals(DropboxFileInfo.TIMESTAMP, CREATED_TIME);
|
||||
flowFile.assertAttributeEquals(DropboxFileInfo.FILENAME, FILENAME);
|
||||
}
|
||||
}
|
|
@ -53,12 +53,12 @@ public class ListDropboxIT extends AbstractDropboxIT<ListDropbox> {
|
|||
|
||||
@Test
|
||||
void testEmbeddedDirectoriesAreListed() throws Exception {
|
||||
createFile("test_file1", "test_file_content1", MAIN_FOLDER);
|
||||
createFile("test_file2", "test_file_content2", MAIN_FOLDER);
|
||||
createFile("test_file11", "test_file_content11", MAIN_FOLDER + "/testFolder1");
|
||||
createFile("test_file112", "test_file_content112", MAIN_FOLDER + "/testFolder2");
|
||||
createFile(MAIN_FOLDER, "test_file1", "test_file_content1");
|
||||
createFile(MAIN_FOLDER, "test_file2", "test_file_content2");
|
||||
createFile(MAIN_FOLDER + "/testFolder1", "test_file11", "test_file_content11");
|
||||
createFile(MAIN_FOLDER + "/testFolder2", "test_file112", "test_file_content112");
|
||||
|
||||
createFile("test_file_not_in_main_folder", "test_file_content31", NOT_MAIN_FOLDER);
|
||||
createFile(NOT_MAIN_FOLDER, "test_file_not_in_main_folder", "test_file_content31");
|
||||
|
||||
List<String> expectedFileNames = Arrays.asList("test_file1", "test_file2", "test_file11", "test_file112");
|
||||
|
||||
|
@ -77,8 +77,8 @@ public class ListDropboxIT extends AbstractDropboxIT<ListDropbox> {
|
|||
void testFolderIsListedById() throws Exception {
|
||||
testRunner.setProperty(ListDropbox.FOLDER, mainFolderId);
|
||||
|
||||
createFile("test_file1", "test_file_content1", MAIN_FOLDER);
|
||||
createFile("test_file11", "test_file_content11", MAIN_FOLDER + "/testFolder1");
|
||||
createFile(MAIN_FOLDER, "test_file1", "test_file_content1");
|
||||
createFile(MAIN_FOLDER + "/testFolder1", "test_file11", "test_file_content11");
|
||||
|
||||
List<String> expectedFileNames = Arrays.asList("test_file1", "test_file11");
|
||||
|
||||
|
@ -97,7 +97,7 @@ public class ListDropboxIT extends AbstractDropboxIT<ListDropbox> {
|
|||
void testTooYoungFilesNotListedWhenMinAgeIsSet() throws Exception {
|
||||
testRunner.setProperty(ListDropbox.MIN_AGE, "15 s");
|
||||
|
||||
createFile(YOUNG_FILE_NAME, "test_file_content1", MAIN_FOLDER);
|
||||
createFile(MAIN_FOLDER, YOUNG_FILE_NAME, "test_file_content1");
|
||||
|
||||
waitForFileCreation();
|
||||
|
||||
|
|
|
@ -20,10 +20,6 @@ package org.apache.nifi.processors.dropbox;
|
|||
import static java.util.Collections.singletonList;
|
||||
import static java.util.Spliterators.spliteratorUnknownSize;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.apache.nifi.services.dropbox.StandardDropboxCredentialService.ACCESS_TOKEN;
|
||||
import static org.apache.nifi.services.dropbox.StandardDropboxCredentialService.APP_KEY;
|
||||
import static org.apache.nifi.services.dropbox.StandardDropboxCredentialService.APP_SECRET;
|
||||
import static org.apache.nifi.services.dropbox.StandardDropboxCredentialService.REFRESH_TOKEN;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -45,11 +41,12 @@ import java.util.Date;
|
|||
import java.util.List;
|
||||
import java.util.Spliterator;
|
||||
import java.util.stream.StreamSupport;
|
||||
import org.apache.nifi.dropbox.credentials.service.DropboxCredentialService;
|
||||
import org.apache.nifi.json.JsonRecordSetWriter;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.services.dropbox.StandardDropboxCredentialService;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
@ -79,7 +76,7 @@ public class ListDropboxTest {
|
|||
private DbxClientV2 mockDropboxClient;
|
||||
|
||||
@Mock
|
||||
private StandardDropboxCredentialService credentialService;
|
||||
private DropboxCredentialService credentialService;
|
||||
|
||||
@Mock
|
||||
private DbxUserFilesRequests mockDbxUserFilesRequest;
|
||||
|
@ -94,7 +91,7 @@ public class ListDropboxTest {
|
|||
void setUp() throws Exception {
|
||||
ListDropbox testSubject = new ListDropbox() {
|
||||
@Override
|
||||
public DbxClientV2 getDropboxApiClient(ProcessContext context) {
|
||||
public DbxClientV2 getDropboxApiClient(ProcessContext context, ProxyConfiguration proxyConfiguration, String clientId) {
|
||||
return mockDropboxClient;
|
||||
}
|
||||
|
||||
|
@ -143,7 +140,7 @@ public class ListDropboxTest {
|
|||
//root is listed when "" is used in Dropbox API
|
||||
when(mockDbxUserFilesRequest.listFolderBuilder("")).thenReturn(mockListFolderBuilder);
|
||||
when(mockListFolderResult.getEntries()).thenReturn(singletonList(
|
||||
createFileMetaData(FILENAME_1, folderName, ID_1, CREATED_TIME)
|
||||
createFileMetadata(FILENAME_1, folderName, ID_1, CREATED_TIME)
|
||||
));
|
||||
|
||||
testRunner.run();
|
||||
|
@ -155,15 +152,16 @@ public class ListDropboxTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
void testOnlyFilesAreListedFolderIsFiltered() throws Exception {
|
||||
void testOnlyFilesAreListedFoldersAndShortcutsAreFiltered() throws Exception {
|
||||
mockFileListing();
|
||||
|
||||
testRunner.setProperty(ListDropbox.FOLDER, TEST_FOLDER);
|
||||
|
||||
when(mockDbxUserFilesRequest.listFolderBuilder(TEST_FOLDER)).thenReturn(mockListFolderBuilder);
|
||||
when(mockListFolderResult.getEntries()).thenReturn(Arrays.asList(
|
||||
createFileMetaData(FILENAME_1, TEST_FOLDER, ID_1, CREATED_TIME),
|
||||
createFolderMetaData("testFolder1", TEST_FOLDER)
|
||||
createFileMetadata(FILENAME_1, TEST_FOLDER, ID_1, CREATED_TIME),
|
||||
createFolderMetadata("testFolder1", TEST_FOLDER),
|
||||
createFileMetadata(FILENAME_2, TEST_FOLDER, ID_2, CREATED_TIME, false)
|
||||
));
|
||||
|
||||
testRunner.run();
|
||||
|
@ -182,8 +180,8 @@ public class ListDropboxTest {
|
|||
|
||||
when(mockDbxUserFilesRequest.listFolderBuilder(TEST_FOLDER)).thenReturn(mockListFolderBuilder);
|
||||
when(mockListFolderResult.getEntries()).thenReturn(Arrays.asList(
|
||||
createFileMetaData(FILENAME_1, TEST_FOLDER, ID_1, CREATED_TIME),
|
||||
createFileMetaData(FILENAME_2, TEST_FOLDER, ID_2, OLD_CREATED_TIME)
|
||||
createFileMetadata(FILENAME_1, TEST_FOLDER, ID_1, CREATED_TIME),
|
||||
createFileMetadata(FILENAME_2, TEST_FOLDER, ID_2, OLD_CREATED_TIME)
|
||||
));
|
||||
|
||||
testRunner.run();
|
||||
|
@ -203,8 +201,8 @@ public class ListDropboxTest {
|
|||
|
||||
when(mockDbxUserFilesRequest.listFolderBuilder(TEST_FOLDER)).thenReturn(mockListFolderBuilder);
|
||||
when(mockListFolderResult.getEntries()).thenReturn(Arrays.asList(
|
||||
createFileMetaData(FILENAME_1, TEST_FOLDER, ID_1, CREATED_TIME),
|
||||
createFileMetaData(FILENAME_2, TEST_FOLDER, ID_2, CREATED_TIME)
|
||||
createFileMetadata(FILENAME_1, TEST_FOLDER, ID_1, CREATED_TIME),
|
||||
createFileMetadata(FILENAME_2, TEST_FOLDER, ID_2, CREATED_TIME)
|
||||
));
|
||||
|
||||
testRunner.run();
|
||||
|
@ -227,20 +225,30 @@ public class ListDropboxTest {
|
|||
flowFile.assertAttributeEquals(DropboxFileInfo.REVISION, REVISION);
|
||||
}
|
||||
|
||||
private FileMetadata createFileMetaData(
|
||||
private FileMetadata createFileMetadata(
|
||||
String filename,
|
||||
String parent,
|
||||
String id,
|
||||
long createdTime) {
|
||||
long createdTime,
|
||||
boolean isDownloadable) {
|
||||
return FileMetadata.newBuilder(filename, id,
|
||||
new Date(createdTime),
|
||||
new Date(createdTime),
|
||||
REVISION, SIZE)
|
||||
.withPathDisplay(parent + "/" + filename)
|
||||
.withIsDownloadable(isDownloadable)
|
||||
.build();
|
||||
}
|
||||
|
||||
private Metadata createFolderMetaData(String folderName, String parent) {
|
||||
private FileMetadata createFileMetadata(
|
||||
String filename,
|
||||
String parent,
|
||||
String id,
|
||||
long createdTime) {
|
||||
return createFileMetadata(filename, parent, id, createdTime, true);
|
||||
}
|
||||
|
||||
private Metadata createFolderMetadata(String folderName, String parent) {
|
||||
return FolderMetadata.newBuilder(folderName)
|
||||
.withPathDisplay(parent + "/" + folderName)
|
||||
.build();
|
||||
|
@ -250,10 +258,6 @@ public class ListDropboxTest {
|
|||
String credentialServiceId = "dropbox_credentials";
|
||||
when(credentialService.getIdentifier()).thenReturn(credentialServiceId);
|
||||
testRunner.addControllerService(credentialServiceId, credentialService);
|
||||
testRunner.setProperty(credentialService, APP_KEY, "appKey");
|
||||
testRunner.setProperty(credentialService, APP_SECRET, "appSecret");
|
||||
testRunner.setProperty(credentialService, ACCESS_TOKEN, "accessToken");
|
||||
testRunner.setProperty(credentialService, REFRESH_TOKEN, "refreshToken");
|
||||
testRunner.enableControllerService(credentialService);
|
||||
testRunner.setProperty(ListDropbox.CREDENTIAL_SERVICE, credentialServiceId);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue