mirror of
https://github.com/apache/nifi.git
synced 2025-02-16 15:06:00 +00:00
Merge branch 'NIFI-25' into develop
This commit is contained in:
commit
58ea7af9a7
@ -534,6 +534,22 @@ The following binary components are provided under the Apache Software License v
|
||||
|
||||
This is free software, licensed under the Apache License, Version 2.0.
|
||||
|
||||
(ASLv2) Amazon Web Services SDK
|
||||
The following NOTICE information applies:
|
||||
Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
|
||||
This product includes software developed by
|
||||
Amazon Technologies, Inc (http://www.amazon.com/).
|
||||
|
||||
**********************
|
||||
THIRD PARTY COMPONENTS
|
||||
**********************
|
||||
This software includes third party software subject to the following copyrights:
|
||||
- XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty.
|
||||
- JSON parsing and utility functions from JSON.org - Copyright 2002 JSON.org.
|
||||
- PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
|
||||
|
||||
|
||||
************************
|
||||
Common Development and Distribution License 1.1
|
||||
************************
|
||||
|
36
nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml
Normal file
36
nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml
Normal file
@ -0,0 +1,36 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-aws-bundle</artifactId>
|
||||
<version>0.1.0-incubating-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-aws-nar</artifactId>
|
||||
<packaging>nar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-aws-processors</artifactId>
|
||||
<version>0.1.0-incubating-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@ -0,0 +1,58 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-aws-bundle</artifactId>
|
||||
<version>0.1.0-incubating-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-aws-processors</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-processor-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-java-sdk</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-simple</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
@ -0,0 +1,195 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import com.amazonaws.AmazonWebServiceClient;
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.auth.AWSCredentials;
|
||||
import com.amazonaws.auth.AnonymousAWSCredentials;
|
||||
import com.amazonaws.auth.BasicAWSCredentials;
|
||||
import com.amazonaws.auth.PropertiesCredentials;
|
||||
import com.amazonaws.regions.Region;
|
||||
import com.amazonaws.regions.Regions;
|
||||
|
||||
public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceClient> extends AbstractProcessor {
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
|
||||
.description("FlowFiles are routed to success after being successfully copied to Amazon S3").build();
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
|
||||
.description("FlowFiles are routed to failure if unable to be copied to Amazon S3").build();
|
||||
|
||||
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
|
||||
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
|
||||
|
||||
public static final PropertyDescriptor CREDENTAILS_FILE = new PropertyDescriptor.Builder()
|
||||
.name("Credentials File")
|
||||
.expressionLanguageSupported(false)
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor ACCESS_KEY = new PropertyDescriptor.Builder()
|
||||
.name("Access Key")
|
||||
.expressionLanguageSupported(false)
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor SECRET_KEY = new PropertyDescriptor.Builder()
|
||||
.name("Secret Key")
|
||||
.expressionLanguageSupported(false)
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
|
||||
.name("Region")
|
||||
.required(true)
|
||||
.allowableValues(getAvailableRegions())
|
||||
.defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
|
||||
.name("Communications Timeout")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.defaultValue("30 secs")
|
||||
.build();
|
||||
|
||||
|
||||
private volatile ClientType client;
|
||||
|
||||
|
||||
private static AllowableValue createAllowableValue(final Regions regions) {
|
||||
return new AllowableValue(regions.getName(), regions.getName(), regions.getName());
|
||||
}
|
||||
|
||||
private static AllowableValue[] getAvailableRegions() {
|
||||
final List<AllowableValue> values = new ArrayList<>();
|
||||
for ( final Regions regions : Regions.values() ) {
|
||||
values.add(createAllowableValue(regions));
|
||||
}
|
||||
|
||||
return (AllowableValue[]) values.toArray(new AllowableValue[values.size()]);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return relationships;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
||||
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
|
||||
|
||||
final boolean accessKeySet = validationContext.getProperty(ACCESS_KEY).isSet();
|
||||
final boolean secretKeySet = validationContext.getProperty(SECRET_KEY).isSet();
|
||||
if ( (accessKeySet && !secretKeySet) || (secretKeySet && !accessKeySet) ) {
|
||||
problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("If setting Secret Key or Access Key, must set both").build());
|
||||
}
|
||||
|
||||
final boolean credentialsFileSet = validationContext.getProperty(CREDENTAILS_FILE).isSet();
|
||||
if ( (secretKeySet || accessKeySet) && credentialsFileSet ) {
|
||||
problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("Cannot set both Credentials File and Secret Key/Access Key").build());
|
||||
}
|
||||
|
||||
return problems;
|
||||
}
|
||||
|
||||
|
||||
protected ClientConfiguration createConfiguration(final ProcessContext context) {
|
||||
final ClientConfiguration config = new ClientConfiguration();
|
||||
config.setMaxConnections(context.getMaxConcurrentTasks());
|
||||
config.setMaxErrorRetry(0);
|
||||
config.setUserAgent("NiFi");
|
||||
|
||||
final int commsTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
||||
config.setConnectionTimeout(commsTimeout);
|
||||
config.setSocketTimeout(commsTimeout);
|
||||
|
||||
return config;
|
||||
}
|
||||
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
final ClientType awsClient = createClient(context, getCredentials(context), createConfiguration(context));
|
||||
this.client = awsClient;
|
||||
|
||||
// if the processor supports REGION, get the configured region.
|
||||
if ( getSupportedPropertyDescriptors().contains(REGION) ) {
|
||||
final String region = context.getProperty(REGION).getValue();
|
||||
if ( region != null ) {
|
||||
client.setRegion(Region.getRegion(Regions.fromName(region)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract ClientType createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config);
|
||||
|
||||
protected ClientType getClient() {
|
||||
return client;
|
||||
}
|
||||
|
||||
protected AWSCredentials getCredentials(final ProcessContext context) {
|
||||
final String accessKey = context.getProperty(ACCESS_KEY).getValue();
|
||||
final String secretKey = context.getProperty(SECRET_KEY).getValue();
|
||||
|
||||
final String credentialsFile = context.getProperty(CREDENTAILS_FILE).getValue();
|
||||
|
||||
if ( credentialsFile != null ) {
|
||||
try {
|
||||
return new PropertiesCredentials(new File(credentialsFile));
|
||||
} catch (final IOException ioe) {
|
||||
throw new ProcessException("Could not read Credentials File", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
if ( accessKey != null && secretKey != null ) {
|
||||
return new BasicAWSCredentials(accessKey, secretKey);
|
||||
}
|
||||
|
||||
return new AnonymousAWSCredentials();
|
||||
}
|
||||
|
||||
|
||||
protected boolean isEmpty(final String value) {
|
||||
return value == null || value.trim().equals("");
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,171 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.s3;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.auth.AWSCredentials;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.model.AccessControlList;
|
||||
import com.amazonaws.services.s3.model.CanonicalGrantee;
|
||||
import com.amazonaws.services.s3.model.EmailAddressGrantee;
|
||||
import com.amazonaws.services.s3.model.Grantee;
|
||||
import com.amazonaws.services.s3.model.Owner;
|
||||
import com.amazonaws.services.s3.model.Permission;
|
||||
|
||||
public abstract class AbstractS3Processor extends AbstractAWSProcessor<AmazonS3Client> {
|
||||
|
||||
public static final PropertyDescriptor FULL_CONTROL_USER_LIST = new PropertyDescriptor.Builder()
|
||||
.name("FullControl User List")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Full Control for an object")
|
||||
.defaultValue("${s3.permissions.full.users}")
|
||||
.build();
|
||||
public static final PropertyDescriptor READ_USER_LIST = new PropertyDescriptor.Builder()
|
||||
.name("Read Permission User List")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Read Access for an object")
|
||||
.defaultValue("${s3.permissions.read.users}")
|
||||
.build();
|
||||
public static final PropertyDescriptor WRITE_USER_LIST = new PropertyDescriptor.Builder()
|
||||
.name("Write Permission User List")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Write Access for an object")
|
||||
.defaultValue("${s3.permissions.write.users}")
|
||||
.build();
|
||||
public static final PropertyDescriptor READ_ACL_LIST = new PropertyDescriptor.Builder()
|
||||
.name("Read ACL User List")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have permissions to read the Access Control List for an object")
|
||||
.defaultValue("${s3.permissions.readacl.users}")
|
||||
.build();
|
||||
public static final PropertyDescriptor WRITE_ACL_LIST = new PropertyDescriptor.Builder()
|
||||
.name("Write ACL User List")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have permissions to change the Access Control List for an object")
|
||||
.defaultValue("${s3.permissions.writeacl.users}")
|
||||
.build();
|
||||
public static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
|
||||
.name("Owner")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.description("The Amazon ID to use for the object's owner")
|
||||
.defaultValue("${s3.owner}")
|
||||
.build();
|
||||
public static final PropertyDescriptor BUCKET = new PropertyDescriptor.Builder()
|
||||
.name("Bucket")
|
||||
.expressionLanguageSupported(true)
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
|
||||
.name("Object Key")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.defaultValue("${filename}")
|
||||
.build();
|
||||
|
||||
|
||||
@Override
|
||||
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
|
||||
return new AmazonS3Client(credentials, config);
|
||||
}
|
||||
|
||||
|
||||
protected Grantee createGrantee(final String value) {
|
||||
if ( isEmpty(value) ) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if ( value.contains("@") ) {
|
||||
return new EmailAddressGrantee(value);
|
||||
} else {
|
||||
return new CanonicalGrantee(value);
|
||||
}
|
||||
}
|
||||
|
||||
protected final List<Grantee> createGrantees(final String value) {
|
||||
if ( isEmpty(value) ) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
final List<Grantee> grantees = new ArrayList<>();
|
||||
final String[] vals = value.split(",");
|
||||
for ( final String val : vals ) {
|
||||
final String identifier = val.trim();
|
||||
final Grantee grantee = createGrantee(identifier);
|
||||
if ( grantee != null ) {
|
||||
grantees.add(grantee);
|
||||
}
|
||||
}
|
||||
return grantees;
|
||||
}
|
||||
|
||||
protected final AccessControlList createACL(final ProcessContext context, final FlowFile flowFile) {
|
||||
final AccessControlList acl = new AccessControlList();
|
||||
|
||||
final String ownerId = context.getProperty(OWNER).evaluateAttributeExpressions(flowFile).getValue();
|
||||
if ( !isEmpty(ownerId) ) {
|
||||
final Owner owner = new Owner();
|
||||
owner.setId(ownerId);
|
||||
acl.setOwner(owner);
|
||||
}
|
||||
|
||||
for ( final Grantee grantee : createGrantees(context.getProperty(FULL_CONTROL_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
|
||||
acl.grantPermission(grantee, Permission.FullControl);
|
||||
}
|
||||
|
||||
for ( final Grantee grantee : createGrantees(context.getProperty(READ_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
|
||||
acl.grantPermission(grantee, Permission.Read);
|
||||
}
|
||||
|
||||
for ( final Grantee grantee : createGrantees(context.getProperty(WRITE_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
|
||||
acl.grantPermission(grantee, Permission.Write);
|
||||
}
|
||||
|
||||
for ( final Grantee grantee : createGrantees(context.getProperty(READ_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
|
||||
acl.grantPermission(grantee, Permission.ReadAcp);
|
||||
}
|
||||
|
||||
for ( final Grantee grantee : createGrantees(context.getProperty(WRITE_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) {
|
||||
acl.grantPermission(grantee, Permission.WriteAcp);
|
||||
}
|
||||
|
||||
return acl;
|
||||
}
|
||||
}
|
@ -0,0 +1,157 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.s3;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.S3Object;
|
||||
|
||||
|
||||
@SupportsBatching
|
||||
@SeeAlso({PutS3Object.class})
|
||||
@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
|
||||
@CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute="s3.bucket", description="The name of the S3 bucket"),
|
||||
@WritesAttribute(attribute="path", description="The path of the file"),
|
||||
@WritesAttribute(attribute="absolute.path", description="The path of the file"),
|
||||
@WritesAttribute(attribute="filename", description="The name of the file"),
|
||||
@WritesAttribute(attribute="hash.value", description="The MD5 sum of the file"),
|
||||
@WritesAttribute(attribute="hash.algorithm", description="MD5"),
|
||||
@WritesAttribute(attribute="mime.type", description="If S3 provides the content type/MIME type, this attribute will hold that file"),
|
||||
@WritesAttribute(attribute="s3.etag", description="The ETag that can be used to see if the file has changed"),
|
||||
@WritesAttribute(attribute="s3.expirationTime", description="If the file has an expiration date, this attribute will be set, containing the milliseconds since epoch in UTC time"),
|
||||
@WritesAttribute(attribute="s3.expirationTimeRuleId", description="The ID of the rule that dictates this object's expiration time"),
|
||||
@WritesAttribute(attribute="s3.version", description="The version of the S3 object"),
|
||||
})
|
||||
public class FetchS3Object extends AbstractS3Processor {
|
||||
|
||||
public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder()
|
||||
.name("Version")
|
||||
.description("The Version of the Object to download")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, TIMEOUT, VERSION_ID) );
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||
FlowFile flowFile = session.get();
|
||||
if ( flowFile == null ) {
|
||||
return;
|
||||
}
|
||||
|
||||
final long startNanos = System.nanoTime();
|
||||
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
|
||||
|
||||
final AmazonS3 client = getClient();
|
||||
final GetObjectRequest request;
|
||||
if ( versionId == null ) {
|
||||
request = new GetObjectRequest(bucket, key);
|
||||
} else {
|
||||
request = new GetObjectRequest(bucket, key, versionId);
|
||||
}
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
try (final S3Object s3Object = client.getObject(request)) {
|
||||
flowFile = session.importFrom(s3Object.getObjectContent(), flowFile);
|
||||
attributes.put("s3.bucket", s3Object.getBucketName());
|
||||
|
||||
final ObjectMetadata metadata = s3Object.getObjectMetadata();
|
||||
if ( metadata.getContentDisposition() != null ) {
|
||||
final String fullyQualified = metadata.getContentDisposition();
|
||||
final int lastSlash = fullyQualified.lastIndexOf("/");
|
||||
if ( lastSlash > -1 && lastSlash < fullyQualified.length() - 1 ) {
|
||||
attributes.put(CoreAttributes.PATH.key(), fullyQualified.substring(0, lastSlash));
|
||||
attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), fullyQualified);
|
||||
attributes.put(CoreAttributes.FILENAME.key(), fullyQualified.substring(lastSlash + 1));
|
||||
} else {
|
||||
attributes.put(CoreAttributes.FILENAME.key(), metadata.getContentDisposition());
|
||||
}
|
||||
}
|
||||
if (metadata.getContentMD5() != null ) {
|
||||
attributes.put("hash.value", metadata.getContentMD5());
|
||||
attributes.put("hash.algorithm", "MD5");
|
||||
}
|
||||
if ( metadata.getContentType() != null ) {
|
||||
attributes.put(CoreAttributes.MIME_TYPE.key(), metadata.getContentType());
|
||||
}
|
||||
if ( metadata.getETag() != null ) {
|
||||
attributes.put("s3.etag", metadata.getETag());
|
||||
}
|
||||
if ( metadata.getExpirationTime() != null ) {
|
||||
attributes.put("s3.expirationTime", String.valueOf(metadata.getExpirationTime().getTime()));
|
||||
}
|
||||
if ( metadata.getExpirationTimeRuleId() != null ) {
|
||||
attributes.put("s3.expirationTimeRuleId", metadata.getExpirationTimeRuleId());
|
||||
}
|
||||
if ( metadata.getUserMetadata() != null ) {
|
||||
attributes.putAll(metadata.getUserMetadata());
|
||||
}
|
||||
if ( metadata.getVersionId() != null ) {
|
||||
attributes.put("s3.version", metadata.getVersionId());
|
||||
}
|
||||
} catch (final IOException | AmazonClientException ioe) {
|
||||
getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[] {flowFile, ioe});
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
if ( !attributes.isEmpty() ) {
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
}
|
||||
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
getLogger().info("Successfully retrieved S3 Object for {} in {} millis; routing to success", new Object[] {flowFile, transferMillis});
|
||||
session.getProvenanceReporter().receive(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,181 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.s3;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.behavior.ReadsAttribute;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.model.AccessControlList;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
import com.amazonaws.services.s3.model.PutObjectResult;
|
||||
import com.amazonaws.services.s3.model.StorageClass;
|
||||
|
||||
@SupportsBatching
|
||||
@SeeAlso({FetchS3Object.class})
|
||||
@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
|
||||
@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
|
||||
@DynamicProperty(name="The name of a User-Defined Metadata field to add to the S3 Object",
|
||||
value="The value of a User-Defined Metadata field to add to the S3 Object",
|
||||
description="Allows user-defined metadata to be added to the S3 object as key/value pairs",
|
||||
supportsExpressionLanguage=true)
|
||||
@ReadsAttribute(attribute="filename", description="Uses the FlowFile's filename as the filename for the S3 object")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute="s3.version", description="The version of the S3 Object that was put to S3"),
|
||||
@WritesAttribute(attribute="s3.etag", description="The ETag of the S3 Object"),
|
||||
@WritesAttribute(attribute="s3.expiration", description="A human-readable form of the expiration date of the S3 object, if one is set")
|
||||
})
|
||||
public class PutS3Object extends AbstractS3Processor {
|
||||
public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
|
||||
.name("Expiration Time Rule")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder()
|
||||
.name("Storage Class")
|
||||
.required(true)
|
||||
.allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name())
|
||||
.defaultValue(StorageClass.Standard.name())
|
||||
.build();
|
||||
|
||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
|
||||
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER) );
|
||||
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
.name(propertyDescriptorName)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.dynamic(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||
FlowFile flowFile = session.get();
|
||||
if ( flowFile == null ) {
|
||||
return;
|
||||
}
|
||||
|
||||
final long startNanos = System.nanoTime();
|
||||
|
||||
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
|
||||
|
||||
final AmazonS3 s3 = getClient();
|
||||
final FlowFile ff = flowFile;
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
try {
|
||||
session.read(flowFile, new InputStreamCallback() {
|
||||
@Override
|
||||
public void process(final InputStream rawIn) throws IOException {
|
||||
try (final InputStream in = new BufferedInputStream(rawIn)) {
|
||||
final ObjectMetadata objectMetadata = new ObjectMetadata();
|
||||
objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key()));
|
||||
objectMetadata.setContentLength(ff.getSize());
|
||||
|
||||
final String expirationRule = context.getProperty(EXPIRATION_RULE_ID).evaluateAttributeExpressions(ff).getValue();
|
||||
if ( expirationRule != null ) {
|
||||
objectMetadata.setExpirationTimeRuleId(expirationRule);
|
||||
}
|
||||
|
||||
final Map<String, String> userMetadata = new HashMap<>();
|
||||
for ( final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet() ) {
|
||||
if ( entry.getKey().isDynamic() ) {
|
||||
final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(ff).getValue();
|
||||
userMetadata.put(entry.getKey().getName(), value);
|
||||
}
|
||||
}
|
||||
|
||||
if ( !userMetadata.isEmpty() ) {
|
||||
objectMetadata.setUserMetadata(userMetadata);
|
||||
}
|
||||
|
||||
final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
|
||||
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
|
||||
final AccessControlList acl = createACL(context, ff);
|
||||
if ( acl != null ) {
|
||||
request.setAccessControlList(acl);
|
||||
}
|
||||
|
||||
final PutObjectResult result = s3.putObject(request);
|
||||
if ( result.getVersionId() != null ) {
|
||||
attributes.put("s3.version", result.getVersionId());
|
||||
}
|
||||
|
||||
attributes.put("s3.etag", result.getETag());
|
||||
|
||||
final Date expiration = result.getExpirationTime();
|
||||
if ( expiration != null ) {
|
||||
attributes.put("s3.expiration", expiration.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if ( !attributes.isEmpty() ) {
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
}
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
|
||||
final String url = "http://" + bucket + ".s3.amazonaws.com/" + key;
|
||||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
session.getProvenanceReporter().send(flowFile, url, millis);
|
||||
|
||||
getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis});
|
||||
} catch (final ProcessException | AmazonClientException pe) {
|
||||
getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe});
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,60 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.sns;
|
||||
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.auth.AWSCredentials;
|
||||
import com.amazonaws.services.sns.AmazonSNSClient;
|
||||
|
||||
public abstract class AbstractSNSProcessor extends AbstractAWSProcessor<AmazonSNSClient> {
|
||||
|
||||
protected static final AllowableValue ARN_TYPE_TOPIC =
|
||||
new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic");
|
||||
protected static final AllowableValue ARN_TYPE_TARGET =
|
||||
new AllowableValue("Target ARN", "Target ARN", "The ARN is the name of a particular Target, used to notify a specific subscriber");
|
||||
|
||||
public static final PropertyDescriptor ARN = new PropertyDescriptor.Builder()
|
||||
.name("Amazon Resource Name (ARN)")
|
||||
.description("The name of the resource to which notifications should be published")
|
||||
.expressionLanguageSupported(true)
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ARN_TYPE = new PropertyDescriptor.Builder()
|
||||
.name("ARN Type")
|
||||
.description("The type of Amazon Resource Name that is being used.")
|
||||
.expressionLanguageSupported(false)
|
||||
.required(true)
|
||||
.allowableValues(ARN_TYPE_TOPIC, ARN_TYPE_TARGET)
|
||||
.defaultValue(ARN_TYPE_TOPIC.getValue())
|
||||
.build();
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
|
||||
return new AmazonSNSClient(credentials, config);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,155 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.sns;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.aws.sqs.GetSQS;
|
||||
import org.apache.nifi.processors.aws.sqs.PutSQS;
|
||||
|
||||
import com.amazonaws.services.sns.AmazonSNSClient;
|
||||
import com.amazonaws.services.sns.model.MessageAttributeValue;
|
||||
import com.amazonaws.services.sns.model.PublishRequest;
|
||||
|
||||
@SupportsBatching
|
||||
@SeeAlso({GetSQS.class, PutSQS.class})
|
||||
@Tags({"amazon", "aws", "sns", "topic", "put", "publish", "pubsub"})
|
||||
@CapabilityDescription("Sends the content of a FlowFile as a notification to the Amazon Simple Notification Service")
|
||||
public class PutSNS extends AbstractSNSProcessor {
|
||||
|
||||
public static final PropertyDescriptor CHARACTER_ENCODING = new PropertyDescriptor.Builder()
|
||||
.name("Character Set")
|
||||
.description("The character set in which the FlowFile's content is encoded")
|
||||
.defaultValue("UTF-8")
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
||||
.required(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor USE_JSON_STRUCTURE = new PropertyDescriptor.Builder()
|
||||
.name("Use JSON Structure")
|
||||
.description("If true, the contents of the FlowFile must be JSON with a top-level element named 'default'. Additional elements can be used to send different messages to different protocols. See the Amazon SNS Documentation for more information.")
|
||||
.defaultValue("false")
|
||||
.allowableValues("true", "false")
|
||||
.required(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder()
|
||||
.name("E-mail Subject")
|
||||
.description("The optional subject to use for any subscribers that are subscribed via E-mail")
|
||||
.expressionLanguageSupported(true)
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||
Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, TIMEOUT,
|
||||
USE_JSON_STRUCTURE, CHARACTER_ENCODING) );
|
||||
|
||||
public static final int MAX_SIZE = 256 * 1024;
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
.name(propertyDescriptorName)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.required(false)
|
||||
.dynamic(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||
FlowFile flowFile = session.get();
|
||||
if ( flowFile == null ) {
|
||||
return;
|
||||
}
|
||||
|
||||
if ( flowFile.getSize() > MAX_SIZE ) {
|
||||
getLogger().error("Cannot publish {} to SNS because its size exceeds Amazon SNS's limit of 256KB; routing to failure", new Object[] {flowFile});
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
final Charset charset = Charset.forName(context.getProperty(CHARACTER_ENCODING).evaluateAttributeExpressions(flowFile).getValue());
|
||||
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
session.exportTo(flowFile, baos);
|
||||
final String message = new String(baos.toByteArray(), charset);
|
||||
|
||||
final AmazonSNSClient client = getClient();
|
||||
final PublishRequest request = new PublishRequest();
|
||||
request.setMessage(message);
|
||||
|
||||
if ( context.getProperty(USE_JSON_STRUCTURE).asBoolean() ) {
|
||||
request.setMessageStructure("json");
|
||||
}
|
||||
|
||||
final String arn = context.getProperty(ARN).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String arnType = context.getProperty(ARN_TYPE).getValue();
|
||||
if ( arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue()) ) {
|
||||
request.setTopicArn(arn);
|
||||
} else {
|
||||
request.setTargetArn(arn);
|
||||
}
|
||||
|
||||
final String subject = context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue();
|
||||
if ( subject != null ) {
|
||||
request.setSubject(subject);
|
||||
}
|
||||
|
||||
for ( final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet() ) {
|
||||
if ( entry.getKey().isDynamic() && !isEmpty(entry.getValue()) ) {
|
||||
final MessageAttributeValue value = new MessageAttributeValue();
|
||||
value.setStringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue());
|
||||
value.setDataType("String");
|
||||
request.addMessageAttributesEntry(entry.getKey().getName(), value);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
client.publish(request);
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
session.getProvenanceReporter().send(flowFile, arn);
|
||||
getLogger().info("Successfully published notification for {}", new Object[] {flowFile});
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[] {flowFile, e});
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,51 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.sqs;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.aws.AbstractAWSProcessor;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.auth.AWSCredentials;
|
||||
import com.amazonaws.services.sqs.AmazonSQSClient;
|
||||
|
||||
public abstract class AbstractSQSProcessor extends AbstractAWSProcessor<AmazonSQSClient> {
|
||||
|
||||
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("Batch Size")
|
||||
.description("The maximum number of messages to send in a single network request")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.defaultValue("25")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor QUEUE_URL = new PropertyDescriptor.Builder()
|
||||
.name("Queue URL")
|
||||
.description("The URL of the queue to act upon")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
protected AmazonSQSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) {
|
||||
return new AmazonSQSClient(credentials, config);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,95 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.sqs;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import com.amazonaws.services.sqs.AmazonSQSClient;
|
||||
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
|
||||
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
|
||||
|
||||
@SupportsBatching
|
||||
@SeeAlso({GetSQS.class, PutSQS.class})
|
||||
@Tags({"Amazon", "AWS", "SQS", "Queue", "Delete"})
|
||||
@CapabilityDescription("Deletes a message from an Amazon Simple Queuing Service Queue")
|
||||
public class DeleteSQS extends AbstractSQSProcessor {
|
||||
public static final PropertyDescriptor RECEIPT_HANDLE = new PropertyDescriptor.Builder()
|
||||
.name("Receipt Handle")
|
||||
.description("The identifier that specifies the receipt of the message")
|
||||
.expressionLanguageSupported(true)
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.defaultValue("${sqs.receipt.handle}")
|
||||
.build();
|
||||
|
||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||
Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT) );
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||
List<FlowFile> flowFiles = session.get(1);
|
||||
if ( flowFiles.isEmpty() ) {
|
||||
return;
|
||||
}
|
||||
|
||||
final FlowFile firstFlowFile = flowFiles.get(0);
|
||||
final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(firstFlowFile).getValue();
|
||||
|
||||
final AmazonSQSClient client = getClient();
|
||||
final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest();
|
||||
request.setQueueUrl(queueUrl);
|
||||
|
||||
final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(flowFiles.size());
|
||||
|
||||
for ( final FlowFile flowFile : flowFiles ) {
|
||||
final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
|
||||
entry.setReceiptHandle(context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue());
|
||||
entries.add(entry);
|
||||
}
|
||||
|
||||
request.setEntries(entries);
|
||||
|
||||
try {
|
||||
client.deleteMessageBatch(request);
|
||||
getLogger().info("Successfully deleted {} objects from SQS", new Object[] {flowFiles.size()});
|
||||
session.transfer(flowFiles, REL_SUCCESS);
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[] {flowFiles.size(), e});
|
||||
session.transfer(flowFiles, REL_FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,204 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.sqs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import com.amazonaws.services.sqs.AmazonSQSClient;
|
||||
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
|
||||
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
|
||||
import com.amazonaws.services.sqs.model.Message;
|
||||
import com.amazonaws.services.sqs.model.MessageAttributeValue;
|
||||
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
|
||||
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
|
||||
|
||||
@SupportsBatching
|
||||
@Tags({ "Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"})
|
||||
@SeeAlso({PutSQS.class, DeleteSQS.class})
|
||||
@CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service Queue")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute="hash.value", description="The MD5 sum of the message"),
|
||||
@WritesAttribute(attribute="hash.algorithm", description="MD5"),
|
||||
@WritesAttribute(attribute="sqs.message.id", description="The unique identifier of the SQS message"),
|
||||
@WritesAttribute(attribute="sqs.receipt.handle", description="The SQS Receipt Handle that is to be used to delete the message from the queue")
|
||||
})
|
||||
public class GetSQS extends AbstractSQSProcessor {
|
||||
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
|
||||
.name("Character Set")
|
||||
.description("The Character Set that should be used to encode the textual content of the SQS message")
|
||||
.required(true)
|
||||
.defaultValue("UTF-8")
|
||||
.allowableValues(Charset.availableCharsets().keySet().toArray(new String[0]))
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder()
|
||||
.name("Auto Delete Messages")
|
||||
.description("Specifies whether the messages should be automatically deleted by the processors once they have been received.")
|
||||
.required(true)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("true")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder()
|
||||
.name("Visibility Timeout")
|
||||
.description("The amount of time after a message is received but not deleted that the message is hidden from other consumers")
|
||||
.expressionLanguageSupported(false)
|
||||
.required(true)
|
||||
.defaultValue("15 mins")
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("Batch Size")
|
||||
.description("The maximum number of messages to send in a single network request")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.createLongValidator(1L, 10L, true))
|
||||
.defaultValue("10")
|
||||
.build();
|
||||
|
||||
|
||||
public static final PropertyDescriptor STATIC_QUEUE_URL = new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(QUEUE_URL)
|
||||
.expressionLanguageSupported(false)
|
||||
.build();
|
||||
|
||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||
Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT) );
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return Collections.singleton(REL_SUCCESS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||
final String queueUrl = context.getProperty(STATIC_QUEUE_URL).getValue();
|
||||
|
||||
final AmazonSQSClient client = getClient();
|
||||
|
||||
final ReceiveMessageRequest request = new ReceiveMessageRequest();
|
||||
request.setAttributeNames(Collections.singleton("All"));
|
||||
request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger());
|
||||
request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue());
|
||||
request.setQueueUrl(queueUrl);
|
||||
|
||||
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
|
||||
|
||||
final ReceiveMessageResult result;
|
||||
try {
|
||||
result = client.receiveMessage(request);
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[] {e});
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
final List<Message> messages = result.getMessages();
|
||||
if ( messages.isEmpty() ) {
|
||||
context.yield();
|
||||
return;
|
||||
}
|
||||
|
||||
final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean();
|
||||
|
||||
for ( final Message message : messages ) {
|
||||
FlowFile flowFile = session.create();
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
for ( final Map.Entry<String, String> entry : message.getAttributes().entrySet() ) {
|
||||
attributes.put("sqs." + entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
for ( final Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet() ) {
|
||||
attributes.put("sqs." + entry.getKey(), entry.getValue().getStringValue());
|
||||
}
|
||||
|
||||
attributes.put("hash.value", message.getMD5OfBody());
|
||||
attributes.put("hash.algorithm", "md5");
|
||||
attributes.put("sqs.message.id", message.getMessageId());
|
||||
attributes.put("sqs.receipt.handle", message.getReceiptHandle());
|
||||
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
flowFile = session.write(flowFile, new OutputStreamCallback() {
|
||||
@Override
|
||||
public void process(final OutputStream out) throws IOException {
|
||||
out.write(message.getBody().getBytes(charset));
|
||||
}
|
||||
});
|
||||
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
session.getProvenanceReporter().receive(flowFile, queueUrl);
|
||||
|
||||
getLogger().info("Successfully received {} from Amazon SQS", new Object[] {flowFile});
|
||||
}
|
||||
|
||||
if ( autoDelete ) {
|
||||
// If we want to auto-delete messages, we must fist commit the session to ensure that the data
|
||||
// is persisted in NiFi's repositories.
|
||||
session.commit();
|
||||
|
||||
final DeleteMessageBatchRequest deleteRequest = new DeleteMessageBatchRequest();
|
||||
deleteRequest.setQueueUrl(queueUrl);
|
||||
final List<DeleteMessageBatchRequestEntry> deleteRequestEntries = new ArrayList<>();
|
||||
for ( final Message message : messages ) {
|
||||
final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry();
|
||||
entry.setId(message.getMessageId());
|
||||
entry.setReceiptHandle(message.getReceiptHandle());
|
||||
deleteRequestEntries.add(entry);
|
||||
}
|
||||
|
||||
deleteRequest.setEntries(deleteRequestEntries);
|
||||
|
||||
try {
|
||||
client.deleteMessageBatch(deleteRequest);
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages may be duplicated. Reason for deletion failure: {}", new Object[] {messages.size(), e});
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,147 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.aws.sqs;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import com.amazonaws.services.sqs.AmazonSQSClient;
|
||||
import com.amazonaws.services.sqs.model.MessageAttributeValue;
|
||||
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
|
||||
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
|
||||
|
||||
|
||||
@SupportsBatching
|
||||
@Tags({"Amazon", "AWS", "SQS", "Queue", "Put", "Publish"})
|
||||
@SeeAlso({GetSQS.class, DeleteSQS.class})
|
||||
@CapabilityDescription("Publishes a message to an Amazon Simple Queuing Service Queue")
|
||||
@DynamicProperty(name="The name of a Message Attribute to add to the message", value="The value of the Message Attribute",
|
||||
description="Allows the user to add key/value pairs as Message Attributes by adding a property whose name will become the name of "
|
||||
+ "the Message Attribute and value will become the value of the Message Attribute", supportsExpressionLanguage=true)
|
||||
public class PutSQS extends AbstractSQSProcessor {
|
||||
|
||||
public static final PropertyDescriptor DELAY = new PropertyDescriptor.Builder()
|
||||
.name("Delay")
|
||||
.description("The amount of time to delay the message before it becomes available to consumers")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.defaultValue("0 secs")
|
||||
.build();
|
||||
|
||||
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
|
||||
Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, DELAY, TIMEOUT) );
|
||||
|
||||
private volatile List<PropertyDescriptor> userDefinedProperties = Collections.emptyList();
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
.name(propertyDescriptorName)
|
||||
.expressionLanguageSupported(true)
|
||||
.required(false)
|
||||
.dynamic(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void setup(final ProcessContext context) {
|
||||
userDefinedProperties = new ArrayList<>();
|
||||
for ( final PropertyDescriptor descriptor : context.getProperties().keySet() ) {
|
||||
if ( descriptor.isDynamic() ) {
|
||||
userDefinedProperties.add(descriptor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||
FlowFile flowFile = session.get();
|
||||
if ( flowFile == null ) {
|
||||
return;
|
||||
}
|
||||
|
||||
final long startNanos = System.nanoTime();
|
||||
final AmazonSQSClient client = getClient();
|
||||
final SendMessageBatchRequest request = new SendMessageBatchRequest();
|
||||
final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue();
|
||||
request.setQueueUrl(queueUrl);
|
||||
|
||||
final Set<SendMessageBatchRequestEntry> entries = new HashSet<>();
|
||||
|
||||
final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
|
||||
entry.setId(flowFile.getAttribute("uuid"));
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
session.exportTo(flowFile, baos);
|
||||
final String flowFileContent = baos.toString();
|
||||
entry.setMessageBody(flowFileContent);
|
||||
|
||||
final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
|
||||
|
||||
for ( final PropertyDescriptor descriptor : userDefinedProperties ) {
|
||||
final MessageAttributeValue mav = new MessageAttributeValue();
|
||||
mav.setDataType("String");
|
||||
mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue());
|
||||
messageAttributes.put(descriptor.getName(), mav);
|
||||
}
|
||||
|
||||
entry.setMessageAttributes(messageAttributes);
|
||||
entry.setDelaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue());
|
||||
entries.add(entry);
|
||||
|
||||
request.setEntries(entries);
|
||||
|
||||
try {
|
||||
client.sendMessageBatch(request);
|
||||
} catch (final Exception e) {
|
||||
getLogger().error("Failed to send messages to Amazon SQS due to {}; routing to failure", new Object[] {e});
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
getLogger().info("Successfully published message to Amazon SQS for {}", new Object[] {flowFile});
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
session.getProvenanceReporter().send(flowFile, queueUrl, transmissionMillis);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.processors.aws.s3.FetchS3Object
|
||||
org.apache.nifi.processors.aws.s3.PutS3Object
|
||||
org.apache.nifi.processors.aws.sns.PutSNS
|
||||
org.apache.nifi.processors.aws.sqs.GetSQS
|
||||
org.apache.nifi.processors.aws.sqs.PutSQS
|
||||
org.apache.nifi.processors.aws.sqs.DeleteSQS
|
@ -0,0 +1,44 @@
|
||||
package org.apache.nifi.processors.aws.s3;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
|
||||
public class TestFetchS3Object {
|
||||
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
|
||||
|
||||
@Test
|
||||
public void testGet() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object());
|
||||
runner.setProperty(FetchS3Object.BUCKET, "anonymous-test-bucket-00000000");
|
||||
runner.setProperty(FetchS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
|
||||
runner.setProperty(FetchS3Object.KEY, "folder/1.txt");
|
||||
|
||||
final Map<String, String> attrs = new HashMap<>();
|
||||
attrs.put("start", "0");
|
||||
|
||||
runner.enqueue(new byte[0], attrs);
|
||||
runner.run(1);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
|
||||
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
|
||||
final MockFlowFile out = ffs.iterator().next();
|
||||
|
||||
final byte[] expectedBytes = Files.readAllBytes(Paths.get("src/test/resources/hello.txt"));
|
||||
out.assertContentEquals(new String(expectedBytes));
|
||||
for ( final Map.Entry<String, String> entry : out.getAttributes().entrySet() ) {
|
||||
System.out.println(entry.getKey() + " : " + entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,87 @@
|
||||
package org.apache.nifi.processors.aws.s3;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.amazonaws.services.s3.model.StorageClass;
|
||||
|
||||
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
|
||||
public class TestPutS3Object {
|
||||
|
||||
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
|
||||
|
||||
@Test
|
||||
public void testSimplePut() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
||||
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
|
||||
runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012");
|
||||
runner.setProperty(PutS3Object.EXPIRATION_RULE_ID, "Expire Quickly");
|
||||
Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() );
|
||||
|
||||
for (int i=0; i < 3; i++) {
|
||||
final Map<String, String> attrs = new HashMap<>();
|
||||
attrs.put("filename", String.valueOf(i) + ".txt");
|
||||
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
|
||||
}
|
||||
runner.run(3);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutInFolder() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
||||
runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012");
|
||||
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
|
||||
runner.setProperty(PutS3Object.EXPIRATION_RULE_ID, "Expire Quickly");
|
||||
Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() );
|
||||
|
||||
final Map<String, String> attrs = new HashMap<>();
|
||||
attrs.put("filename", "folder/1.txt");
|
||||
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testStorageClass() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
||||
runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012");
|
||||
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
|
||||
runner.setProperty(PutS3Object.STORAGE_CLASS, StorageClass.ReducedRedundancy.name());
|
||||
Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() );
|
||||
|
||||
final Map<String, String> attrs = new HashMap<>();
|
||||
attrs.put("filename", "folder/2.txt");
|
||||
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPermissions() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
|
||||
runner.setProperty(PutS3Object.BUCKET, "test-bucket-00000000-0000-0000-0000-123456789012");
|
||||
runner.setProperty(PutS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
|
||||
runner.setProperty(PutS3Object.FULL_CONTROL_USER_LIST, "28545acd76c35c7e91f8409b95fd1aa0c0914bfa1ac60975d9f48bc3c5e090b5");
|
||||
|
||||
final Map<String, String> attrs = new HashMap<>();
|
||||
attrs.put("filename", "folder/4.txt");
|
||||
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,34 @@
|
||||
package org.apache.nifi.processors.aws.sns;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
|
||||
public class TestPutSNS {
|
||||
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
|
||||
|
||||
@Test
|
||||
public void testPublish() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutSNS());
|
||||
runner.setProperty(PutSNS.CREDENTAILS_FILE, CREDENTIALS_FILE);
|
||||
runner.setProperty(PutSNS.ARN, "arn:aws:sns:us-west-2:100515378163:test-topic-1");
|
||||
assertTrue( runner.setProperty("DynamicProperty", "hello!").isValid() );
|
||||
|
||||
final Map<String, String> attrs = new HashMap<>();
|
||||
attrs.put("filename", "1.txt");
|
||||
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,32 @@
|
||||
package org.apache.nifi.processors.aws.sqs;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.nifi.processors.aws.sns.PutSNS;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
|
||||
public class TestGetSQS {
|
||||
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
|
||||
|
||||
@Test
|
||||
public void testSimpleGet() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new GetSQS());
|
||||
runner.setProperty(PutSNS.CREDENTAILS_FILE, CREDENTIALS_FILE);
|
||||
runner.setProperty(GetSQS.TIMEOUT, "30 secs");
|
||||
runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000");
|
||||
|
||||
runner.run(1);
|
||||
|
||||
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
|
||||
for ( final MockFlowFile mff : flowFiles ) {
|
||||
System.out.println(mff.getAttributes());
|
||||
System.out.println(new String(mff.toByteArray()));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
package org.apache.nifi.processors.aws.sqs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.nifi.processors.aws.sns.PutSNS;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
|
||||
public class TestPutSQS {
|
||||
private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
|
||||
|
||||
@Test
|
||||
public void testSimplePut() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutSQS());
|
||||
runner.setProperty(PutSNS.CREDENTAILS_FILE, CREDENTIALS_FILE);
|
||||
runner.setProperty(PutSQS.TIMEOUT, "30 secs");
|
||||
runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000");
|
||||
Assert.assertTrue( runner.setProperty("x-custom-prop", "hello").isValid() );
|
||||
|
||||
final Map<String, String> attrs = new HashMap<>();
|
||||
attrs.put("filename", "1.txt");
|
||||
runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
|
||||
runner.run(1);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1 @@
|
||||
Hello, World!!
|
43
nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml
Normal file
43
nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml
Normal file
@ -0,0 +1,43 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-nar-bundles</artifactId>
|
||||
<version>0.1.0-incubating-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-aws-bundle</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<modules>
|
||||
<module>nifi-aws-processors</module>
|
||||
<module>nifi-aws-nar</module>
|
||||
</modules>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-java-sdk</artifactId>
|
||||
<version>1.9.24</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
</project>
|
@ -12,8 +12,7 @@
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
@ -35,6 +34,7 @@
|
||||
<module>nifi-update-attribute-bundle</module>
|
||||
<module>nifi-kafka-bundle</module>
|
||||
<module>nifi-kite-bundle</module>
|
||||
<module>nifi-aws-bundle</module>
|
||||
<module>nifi-social-media-bundle</module>
|
||||
<module>nifi-geo-bundle</module>
|
||||
<module>nifi-hl7-bundle</module>
|
||||
|
Loading…
x
Reference in New Issue
Block a user