mirror of https://github.com/apache/nifi.git
NIFI-1225: Changes for ENDPOINT_OVERRIDE and SSL Context Service from pr for NIFI-1107
Reviewed by Mark Payne <markap14@hotmail.com> and Tony Kurc (tkurc@apache.org)
This commit is contained in:
parent
fbaf1c36de
commit
55352ae5b1
|
@ -50,6 +50,10 @@
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-ssl-context-service-api</artifactId>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
<build>
|
<build>
|
||||||
<plugins>
|
<plugins>
|
||||||
|
|
|
@ -27,6 +27,9 @@ import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.net.ssl.SSLContext;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
import org.apache.nifi.components.AllowableValue;
|
import org.apache.nifi.components.AllowableValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
@ -37,6 +40,7 @@ import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.ssl.SSLContextService;
|
||||||
|
|
||||||
import com.amazonaws.AmazonWebServiceClient;
|
import com.amazonaws.AmazonWebServiceClient;
|
||||||
import com.amazonaws.ClientConfiguration;
|
import com.amazonaws.ClientConfiguration;
|
||||||
|
@ -45,6 +49,7 @@ import com.amazonaws.auth.AWSCredentials;
|
||||||
import com.amazonaws.auth.AnonymousAWSCredentials;
|
import com.amazonaws.auth.AnonymousAWSCredentials;
|
||||||
import com.amazonaws.auth.BasicAWSCredentials;
|
import com.amazonaws.auth.BasicAWSCredentials;
|
||||||
import com.amazonaws.auth.PropertiesCredentials;
|
import com.amazonaws.auth.PropertiesCredentials;
|
||||||
|
import com.amazonaws.http.conn.ssl.SdkTLSSocketFactory;
|
||||||
import com.amazonaws.regions.Region;
|
import com.amazonaws.regions.Region;
|
||||||
import com.amazonaws.regions.Regions;
|
import com.amazonaws.regions.Regions;
|
||||||
|
|
||||||
|
@ -92,6 +97,22 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
|
||||||
.defaultValue("30 secs")
|
.defaultValue("30 secs")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
||||||
|
.name("SSL Context Service")
|
||||||
|
.description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
|
||||||
|
.required(false)
|
||||||
|
.identifiesControllerService(SSLContextService.class)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder()
|
||||||
|
.name("Endpoint Override URL")
|
||||||
|
.description("Endpoint URL to use instead of the AWS default including scheme, host, port, and path. " +
|
||||||
|
"The AWS libraries select an endpoint URL based on the AWS region, but this property overrides " +
|
||||||
|
"the selected endpoint URL, allowing use with other S3-compatible endpoints.")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.URL_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
private volatile ClientType client;
|
private volatile ClientType client;
|
||||||
private volatile Region region;
|
private volatile Region region;
|
||||||
|
|
||||||
|
@ -146,6 +167,13 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
|
||||||
config.setConnectionTimeout(commsTimeout);
|
config.setConnectionTimeout(commsTimeout);
|
||||||
config.setSocketTimeout(commsTimeout);
|
config.setSocketTimeout(commsTimeout);
|
||||||
|
|
||||||
|
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||||
|
if (sslContextService != null) {
|
||||||
|
final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE);
|
||||||
|
SdkTLSSocketFactory sdkTLSSocketFactory = new SdkTLSSocketFactory(sslContext, null);
|
||||||
|
config.getApacheHttpClientConfig().setSslSocketFactory(sdkTLSSocketFactory);
|
||||||
|
}
|
||||||
|
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,10 +188,17 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
|
||||||
if (region != null) {
|
if (region != null) {
|
||||||
this.region = Region.getRegion(Regions.fromName(region));
|
this.region = Region.getRegion(Regions.fromName(region));
|
||||||
client.setRegion(this.region);
|
client.setRegion(this.region);
|
||||||
} else{
|
} else {
|
||||||
this.region = null;
|
this.region = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if the endpoint override has been configured, set the endpoint.
|
||||||
|
// (per Amazon docs this should only be configured at client creation)
|
||||||
|
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).getValue());
|
||||||
|
if (!urlstr.isEmpty()) {
|
||||||
|
this.client.setEndpoint(urlstr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract ClientType createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config);
|
protected abstract ClientType createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config);
|
||||||
|
|
|
@ -40,7 +40,7 @@ import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
|
|
||||||
@SupportsBatching
|
@SupportsBatching
|
||||||
@SeeAlso({PutS3Object.class})
|
@SeeAlso({PutS3Object.class, FetchS3Object.class})
|
||||||
@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
|
@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
|
||||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
@CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " +
|
@CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " +
|
||||||
|
@ -57,7 +57,8 @@ public class DeleteS3Object extends AbstractS3Processor {
|
||||||
|
|
||||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||||
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, TIMEOUT, VERSION_ID,
|
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, TIMEOUT, VERSION_ID,
|
||||||
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
|
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
|
||||||
|
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE));
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
|
|
@ -46,7 +46,7 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||||
import com.amazonaws.services.s3.model.S3Object;
|
import com.amazonaws.services.s3.model.S3Object;
|
||||||
|
|
||||||
@SupportsBatching
|
@SupportsBatching
|
||||||
@SeeAlso({PutS3Object.class})
|
@SeeAlso({PutS3Object.class, DeleteS3Object.class})
|
||||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
|
@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
|
||||||
@CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile")
|
@CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile")
|
||||||
|
@ -73,7 +73,7 @@ public class FetchS3Object extends AbstractS3Processor {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||||
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT, VERSION_ID));
|
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT, VERSION_ID, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE));
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.nifi.processors.aws.s3;
|
||||||
import java.io.BufferedInputStream;
|
import java.io.BufferedInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
@ -27,6 +28,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
@ -47,7 +49,7 @@ import org.apache.nifi.processor.io.InputStreamCallback;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
import com.amazonaws.AmazonClientException;
|
import com.amazonaws.AmazonClientException;
|
||||||
import com.amazonaws.services.s3.AmazonS3;
|
import com.amazonaws.services.s3.AmazonS3Client;
|
||||||
import com.amazonaws.services.s3.model.AccessControlList;
|
import com.amazonaws.services.s3.model.AccessControlList;
|
||||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||||
|
@ -55,7 +57,7 @@ import com.amazonaws.services.s3.model.PutObjectResult;
|
||||||
import com.amazonaws.services.s3.model.StorageClass;
|
import com.amazonaws.services.s3.model.StorageClass;
|
||||||
|
|
||||||
@SupportsBatching
|
@SupportsBatching
|
||||||
@SeeAlso({FetchS3Object.class})
|
@SeeAlso({FetchS3Object.class, DeleteS3Object.class})
|
||||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||||
@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
|
@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
|
||||||
@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
|
@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
|
||||||
|
@ -63,9 +65,13 @@ import com.amazonaws.services.s3.model.StorageClass;
|
||||||
description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", supportsExpressionLanguage = true)
|
description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", supportsExpressionLanguage = true)
|
||||||
@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object")
|
@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object")
|
||||||
@WritesAttributes({
|
@WritesAttributes({
|
||||||
|
@WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where the Object was put in S3"),
|
||||||
|
@WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"),
|
||||||
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
|
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
|
||||||
@WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
|
@WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
|
||||||
@WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set")
|
@WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set"),
|
||||||
|
@WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"),
|
||||||
|
@WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable form of the User Metadata of the S3 object, if any was set")
|
||||||
})
|
})
|
||||||
public class PutS3Object extends AbstractS3Processor {
|
public class PutS3Object extends AbstractS3Processor {
|
||||||
|
|
||||||
|
@ -85,7 +91,17 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
|
|
||||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||||
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
|
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
|
||||||
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
|
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE));
|
||||||
|
|
||||||
|
final static String S3_BUCKET_KEY = "s3.bucket";
|
||||||
|
final static String S3_OBJECT_KEY = "s3.key";
|
||||||
|
final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId";
|
||||||
|
final static String S3_VERSION_ATTR_KEY = "s3.version";
|
||||||
|
final static String S3_ETAG_ATTR_KEY = "s3.etag";
|
||||||
|
final static String S3_EXPIRATION_ATTR_KEY = "s3.expiration";
|
||||||
|
final static String S3_STORAGECLASS_ATTR_KEY = "s3.storeClass";
|
||||||
|
final static String S3_STORAGECLASS_META_KEY = "x-amz-storage-class";
|
||||||
|
final static String S3_USERMETA_ATTR_KEY = "s3.usermetadata";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
@ -114,9 +130,12 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
|
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
|
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
|
||||||
final AmazonS3 s3 = getClient();
|
final AmazonS3Client s3 = getClient();
|
||||||
final FlowFile ff = flowFile;
|
final FlowFile ff = flowFile;
|
||||||
final Map<String, String> attributes = new HashMap<>();
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(S3_BUCKET_KEY, bucket);
|
||||||
|
attributes.put(S3_OBJECT_KEY, key);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
session.read(flowFile, new InputStreamCallback() {
|
session.read(flowFile, new InputStreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -152,14 +171,25 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
|
|
||||||
final PutObjectResult result = s3.putObject(request);
|
final PutObjectResult result = s3.putObject(request);
|
||||||
if (result.getVersionId() != null) {
|
if (result.getVersionId() != null) {
|
||||||
attributes.put("s3.version", result.getVersionId());
|
attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId());
|
||||||
}
|
}
|
||||||
|
|
||||||
attributes.put("s3.etag", result.getETag());
|
attributes.put(S3_ETAG_ATTR_KEY, result.getETag());
|
||||||
|
|
||||||
final Date expiration = result.getExpirationTime();
|
final Date expiration = result.getExpirationTime();
|
||||||
if (expiration != null) {
|
if (expiration != null) {
|
||||||
attributes.put("s3.expiration", expiration.toString());
|
attributes.put(S3_EXPIRATION_ATTR_KEY, expiration.toString());
|
||||||
|
}
|
||||||
|
if (result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY)) {
|
||||||
|
attributes.put(S3_STORAGECLASS_ATTR_KEY,
|
||||||
|
result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString());
|
||||||
|
}
|
||||||
|
if (userMetadata.size() > 0) {
|
||||||
|
List<String> pairs = new ArrayList<String>();
|
||||||
|
for (String userKey : userMetadata.keySet()) {
|
||||||
|
pairs.add(userKey + "=" + userMetadata.get(userKey));
|
||||||
|
}
|
||||||
|
attributes.put(S3_USERMETA_ATTR_KEY, StringUtils.join(pairs, ", "));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -170,7 +200,7 @@ public class PutS3Object extends AbstractS3Processor {
|
||||||
}
|
}
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
|
||||||
final String url = getUrlForObject(bucket, key);
|
final String url = s3.getResourceUrl(bucket, key);
|
||||||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||||
session.getProvenanceReporter().send(flowFile, url, millis);
|
session.getProvenanceReporter().send(flowFile, url, millis);
|
||||||
|
|
||||||
|
|
|
@ -18,8 +18,11 @@ package org.apache.nifi.processors.aws.s3;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -51,6 +54,33 @@ public class TestPutS3Object extends AbstractS3Test {
|
||||||
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 3);
|
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetaData() throws IOException {
|
||||||
|
PutS3Object processor = new PutS3Object();
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||||
|
|
||||||
|
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
|
||||||
|
runner.setProperty(PutS3Object.REGION, REGION);
|
||||||
|
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
|
||||||
|
PropertyDescriptor prop1 = processor.getSupportedDynamicPropertyDescriptor("TEST-PROP-1");
|
||||||
|
runner.setProperty(prop1, "TESTING-1-2-3");
|
||||||
|
PropertyDescriptor prop2 = processor.getSupportedDynamicPropertyDescriptor("TEST-PROP-2");
|
||||||
|
runner.setProperty(prop2, "TESTING-4-5-6");
|
||||||
|
|
||||||
|
final Map<String, String> attrs = new HashMap<>();
|
||||||
|
attrs.put("filename", "meta.txt");
|
||||||
|
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
|
||||||
|
MockFlowFile ff1 = flowFiles.get(0);
|
||||||
|
for (Map.Entry attrib : ff1.getAttributes().entrySet()) {
|
||||||
|
System.out.println(attrib.getKey() + " = " + attrib.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutInFolder() throws IOException {
|
public void testPutInFolder() throws IOException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
||||||
|
|
Loading…
Reference in New Issue