nifi-992 Adding nifi-couchbase-bundle.

- new CouchbaseClusterControllerService
- new Processors
  - GetCouchbaseKey
  - PutCouchbaseKey

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
ijokarumawak 2015-09-26 02:46:37 +09:00 committed by Bryan Bende
parent 96764ed6a1
commit 2466a24530
19 changed files with 1643 additions and 0 deletions

View File

@ -709,6 +709,16 @@ The following binary components are provided under the Apache Software License v
Metadata-Extractor
Copyright 2002-2015 Drew Noakes
(ASLv2) Couchbase Java SDK
The following NOTICE information applies:
Couchbase Java SDK
Copyright 2014 Couchbase, Inc.
(ASLv2) RxJava
The following NOTICE information applies:
Couchbase Java SDK
Copyright 2012 Netflix, Inc.
************************
Common Development and Distribution License 1.1
************************

View File

@ -227,6 +227,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-image-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-couchbase-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies>
<properties>

View File

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-couchbase-bundle</artifactId>
<version>0.3.1-SNAPSHOT</version>
</parent>
<artifactId>nifi-couchbase-nar</artifactId>
<version>0.3.1-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-couchbase-processors</artifactId>
<version>0.3.1-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,208 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-couchbase-bundle</artifactId>
<version>0.3.1-SNAPSHOT</version>
</parent>
<artifactId>nifi-couchbase-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>java-client</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.15</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>6.5</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
<checkstyleRules>
<module name="Checker">
<property name="charset" value="UTF-8" />
<property name="severity" value="warning" />
<!-- Checks for whitespace -->
<!-- See http://checkstyle.sf.net/config_whitespace.html -->
<module name="FileTabCharacter">
<property name="eachLine" value="true" />
</module>
<module name="TreeWalker">
<module name="RegexpSinglelineJava">
<property name="format" value="\s+$" />
<property name="message" value="Line has trailing whitespace." />
</module>
<module name="RegexpSinglelineJava">
<property name="format" value="[@]see\s+[{][@]link" />
<property name="message" value="Javadoc @see does not need @link: pick one or the other." />
</module>
<module name="OuterTypeFilename" />
<module name="LineLength">
<!-- needs extra, because Eclipse formatter ignores the ending left
brace -->
<property name="max" value="200" />
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://" />
</module>
<module name="AvoidStarImport" />
<module name="UnusedImports">
<property name="processJavadoc" value="true" />
</module>
<module name="NoLineWrap" />
<module name="LeftCurly">
<property name="maxLineLength" value="160" />
</module>
<module name="RightCurly" />
<module name="RightCurly">
<property name="option" value="alone" />
<property name="tokens" value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, INSTANCE_INIT" />
</module>
<module name="SeparatorWrap">
<property name="tokens" value="DOT" />
<property name="option" value="nl" />
</module>
<module name="SeparatorWrap">
<property name="tokens" value="COMMA" />
<property name="option" value="EOL" />
</module>
<module name="PackageName">
<property name="format" value="^[a-z]+(\.[a-z][a-zA-Z0-9]*)*$" />
</module>
<module name="MethodTypeParameterName">
<property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)" />
</module>
<module name="MethodParamPad" />
<module name="OperatorWrap">
<property name="option" value="NL" />
<property name="tokens" value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD, NOT_EQUAL, QUESTION, SL, SR, STAR " />
</module>
<module name="AnnotationLocation">
<property name="tokens" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF" />
</module>
<module name="AnnotationLocation">
<property name="tokens" value="VARIABLE_DEF" />
<property name="allowSamelineMultipleAnnotations" value="true" />
</module>
<module name="NonEmptyAtclauseDescription" />
<module name="JavadocMethod">
<property name="allowMissingJavadoc" value="true" />
<property name="allowMissingParamTags" value="true" />
<property name="allowMissingThrowsTags" value="true" />
<property name="allowMissingReturnTag" value="true" />
<property name="allowedAnnotations" value="Override,Test,BeforeClass,AfterClass,Before,After" />
<property name="allowThrowsTagsForSubclasses" value="true" />
</module>
<module name="SingleLineJavadoc" />
</module>
</module>
</checkstyleRules>
<violationSeverity>warning</violationSeverity>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<!-- Checks style and licensing requirements. This is a good idea to run
for contributions and for the release process. While it would be nice to
run always these plugins can considerably slow the build and have proven
to create unstable builds in our multi-module project and when building using
multiple threads. The stability issues seen with Checkstyle in multi-module
builds include false-positives and false negatives. -->
<id>contrib-check</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
<phase>verify</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<executions>
<execution>
<id>check-style</id>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.couchbase;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
/**
* Couchbase related attribute keys.
*/
public enum CouchbaseAttributes implements FlowFileAttributeKey {
/**
* A reference to the related cluster.
*/
Cluster("couchbase.cluster"),
/**
* A related bucket name.
*/
Bucket("couchbase.bucket"),
/**
* The id of a related document.
*/
DocId("couchbase.doc.id"),
/**
* The CAS value of a related document.
*/
Cas("couchbase.doc.cas"),
/**
* The expiration of a related document.
*/
Expiry("couchbase.doc.expiry"),
;
private final String key;
private CouchbaseAttributes(final String key) {
this.key = key;
}
@Override
public String key() {
return key;
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.couchbase;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.controller.ControllerService;
import com.couchbase.client.java.Bucket;
/**
* Provides a connection to a Couchbase Server cluster throughout a NiFi Data
* flow.
*/
@CapabilityDescription("Provides a centralized Couchbase connection.")
public interface CouchbaseClusterControllerService extends ControllerService {
/**
* Open a bucket connection.
* @param bucketName the bucket name to access
* @return a connected bucket instance
*/
public Bucket openBucket(String bucketName);
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.couchbase;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.CouchbaseCluster;
/**
* Provides a centralized Couchbase connection and bucket passwords management.
*/
@CapabilityDescription("Provides a centralized Couchbase connection and bucket passwords management."
+ " Bucket passwords can be specified via dynamic properties.")
@Tags({ "nosql", "couchbase", "database", "connection" })
@DynamicProperty(name = "Bucket Password for BUCKET_NAME", value = "bucket password", description = "Specify bucket password if neseccery.")
public class CouchbaseClusterService extends AbstractControllerService implements CouchbaseClusterControllerService {
public static final PropertyDescriptor CONNECTION_STRING = new PropertyDescriptor
.Builder().name("Connection String")
.description("The hostnames or ip addresses of the bootstraping nodes and optional parameters."
+ " Syntax) couchbase://node1,node2,nodeN?param1=value1&param2=value2&paramN=valueN")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private static final List<PropertyDescriptor> properties;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(CONNECTION_STRING);
properties = Collections.unmodifiableList(props);
}
private static final String DYNAMIC_PROP_BUCKET_PASSWORD = "Bucket Password for ";
private static final Map<String, String> bucketPasswords = new HashMap<>();
private volatile CouchbaseCluster cluster;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(
String propertyDescriptorName) {
if(propertyDescriptorName.startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)){
return new PropertyDescriptor
.Builder().name(propertyDescriptorName)
.description("Bucket password.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dynamic(true)
.sensitive(true)
.build();
}
return null;
}
/**
* Establish a connection to a Couchbase cluster.
* @param context the configuration context
* @throws InitializationException if unable to connect a Couchbase cluster
*/
@OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException {
for(PropertyDescriptor p : context.getProperties().keySet()){
if(p.isDynamic() && p.getName().startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)){
String bucketName = p.getName().substring(DYNAMIC_PROP_BUCKET_PASSWORD.length());
String password = context.getProperty(p).getValue();
bucketPasswords.put(bucketName, password);
}
}
try {
cluster = CouchbaseCluster.fromConnectionString(context.getProperty(CONNECTION_STRING).getValue());
} catch(CouchbaseException e) {
throw new InitializationException(e);
}
}
@Override
public Bucket openBucket(String bucketName){
return cluster.openBucket(bucketName, bucketPasswords.get(bucketName));
}
/**
* Disconnect from the Couchbase cluster.
*/
@OnDisabled
public void shutdown() {
if(cluster != null){
cluster.disconnect();
cluster = null;
}
}
}

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.couchbase;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import com.couchbase.client.java.Bucket;
/**
* Provides common functionalities for Couchbase processors.
*/
public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor
.Builder().name("Document Type")
.description("The type of contents.")
.required(true)
.allowableValues(DocumentType.values())
.defaultValue(DocumentType.Json.toString())
.build();
public static final PropertyDescriptor DOC_ID = new PropertyDescriptor
.Builder().name("Static Document Id")
.description("A static, fixed Couchbase document id.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DOC_ID_EXP = new PropertyDescriptor
.Builder().name("Document Id Expression")
.description("An expression to construct the Couchbase document id."
+ " If 'Static Document Id' is specified, then 'Static Document Id' is used.")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that are written to Couchbase Server are routed to this relationship.")
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("The original input file will be routed to this destination when it has been successfully processed.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("All FlowFiles that cannot written to Couchbase Server are routed to this relationship.")
.build();
public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor
.Builder().name("Couchbase Cluster Controller Service")
.description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.")
.required(true)
.identifiesControllerService(CouchbaseClusterControllerService.class)
.build();
public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor
.Builder().name("Bucket Name")
.description("The name of bucket to access.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("default")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
private CouchbaseClusterControllerService clusterService;
@Override
protected final void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(COUCHBASE_CLUSTER_SERVICE);
descriptors.add(BUCKET_NAME);
addSupportedProperties(descriptors);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
addSupportedRelationships(relationships);
this.relationships = Collections.unmodifiableSet(relationships);
}
/**
* Add processor specific properties.
* @param descriptors add properties to this list
*/
protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
return;
}
/**
* Add processor specific relationships.
* @param relationships add relationships to this list
*/
protected void addSupportedRelationships(Set<Relationship> relationships) {
return;
}
@Override
public final Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
private CouchbaseClusterControllerService getClusterService(final ProcessContext context) {
if(clusterService == null){
synchronized(AbstractCouchbaseProcessor.class){
if(clusterService == null){
clusterService = context.getProperty(COUCHBASE_CLUSTER_SERVICE)
.asControllerService(CouchbaseClusterControllerService.class);
}
}
}
return clusterService;
}
/**
* Open a bucket connection using a CouchbaseClusterControllerService.
* @param context a process context
* @return a bucket instance
*/
protected final Bucket openBucket(final ProcessContext context) {
return getClusterService(context).openBucket(context.getProperty(BUCKET_NAME).getValue());
}
/**
* Generate a transit url.
* @param context a process context
* @return a transit url based on the bucket name and the CouchbaseClusterControllerService name
*/
protected String getTransitUrl(final ProcessContext context) {
return new StringBuilder(context.getProperty(BUCKET_NAME).getValue())
.append('@')
.append(context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue())
.toString();
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.couchbase;
/**
* Supported Couchbase document types.
*
* In order to handle a variety type of document classes such as JsonDocument,
* JsonLongDocument or JsonStringDocument, Couchbase processors use
* RawJsonDocument for Json type.
*
* The distinction between Json and Binary exists because BinaryDocument doesn't
* set Json flag when it stored on Couchbase Server even if the content byte
* array represents a Json string, and it can't be retrieved as a Json document.
*/
public enum DocumentType {
Json,
Binary
}

View File

@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.couchbase;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.couchbase.CouchbaseAttributes;
import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.document.BinaryDocument;
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.RawJsonDocument;
@Tags({ "nosql", "couchbase", "database", "get" })
@CapabilityDescription("Get a document from Couchbase Server via Key/Value access.")
@SeeAlso({CouchbaseClusterControllerService.class})
@ReadsAttributes({
@ReadsAttribute(attribute = "FlowFile content", description = "Used as a document id if none of 'Static Document Id' or 'Document Id Expression' is specified"),
@ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id Excepression.")
})
@WritesAttributes({
@WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was retrieved from."),
@WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was retrieved from."),
@WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."),
@WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."),
@WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document.")
})
public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
@Override
protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
descriptors.add(DOCUMENT_TYPE);
descriptors.add(DOC_ID);
descriptors.add(DOC_ID_EXP);
}
@Override
protected void addSupportedRelationships(Set<Relationship> relationships) {
relationships.add(REL_SUCCESS);
relationships.add(REL_ORIGINAL);
relationships.add(REL_FAILURE);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ProcessorLog logger = getLogger();
FlowFile inFile = session.get();
String docId = null;
if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
docId = context.getProperty(DOC_ID).getValue();
} else {
// Otherwise docId has to be extracted from inFile.
if ( inFile == null ) {
return;
}
if(!StringUtils.isEmpty(context.getProperty(DOC_ID_EXP).getValue())){
docId = context.getProperty(DOC_ID_EXP).evaluateAttributeExpressions(inFile).getValue();
} else {
final byte[] content = new byte[(int) inFile.getSize()];
session.read(inFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, content, true);
}
});
docId = new String(content, StandardCharsets.UTF_8);
}
}
if(StringUtils.isEmpty(docId)){
logger.error("Couldn't get document id from from {}", new Object[]{inFile});
session.transfer(inFile, REL_FAILURE);
}
try {
Document<?> doc = null;
byte[] content = null;
Bucket bucket = openBucket(context);
DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
switch (documentType){
case Json : {
RawJsonDocument document = bucket.get(docId, RawJsonDocument.class);
if(document != null){
content = document.content().getBytes(StandardCharsets.UTF_8);
doc = document;
}
break;
}
case Binary : {
BinaryDocument document = bucket.get(docId, BinaryDocument.class);
if(document != null){
content = document.content().array();
doc = document;
}
break;
}
}
if(doc == null) {
logger.info("Document {} was not found in {}", new Object[]{docId, getTransitUrl(context)});
if(inFile != null){
session.transfer(inFile, REL_FAILURE);
}
return;
}
if(inFile != null){
session.transfer(inFile, REL_ORIGINAL);
}
FlowFile outFile = session.create();
outFile = session.importFrom(new ByteArrayInputStream(content), outFile);
Map<String, String> updatedAttrs = new HashMap<>();
updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue());
updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas()));
updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry()));
outFile = session.putAllAttributes(outFile, updatedAttrs);
session.getProvenanceReporter().receive(outFile, getTransitUrl(context));
session.transfer(outFile, REL_SUCCESS);
} catch (Throwable t){
logger.error("Getting docuement {} from Couchbase Server using {} failed due to {}",
new Object[]{docId, inFile, t}, t);
if(inFile != null){
session.transfer(inFile, REL_FAILURE);
}
}
}
}

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.couchbase;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.couchbase.CouchbaseAttributes;
import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.java.PersistTo;
import com.couchbase.client.java.ReplicateTo;
import com.couchbase.client.java.document.BinaryDocument;
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.RawJsonDocument;
@Tags({ "nosql", "couchbase", "database", "put" })
@CapabilityDescription("Put a document to Couchbase Server via Key/Value access.")
@SeeAlso({CouchbaseClusterControllerService.class})
@ReadsAttributes({
@ReadsAttribute(attribute = "uuid", description = "Used as a document id if none of 'Static Document Id' or 'Document Id Expression' is specified"),
@ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id Excepression.")
})
@WritesAttributes({
@WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was stored."),
@WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was stored."),
@WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."),
@WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."),
@WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document.")
})
public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor
.Builder().name("Persist To")
.description("Durability constraint about disk persistence.")
.required(true)
.allowableValues(PersistTo.values())
.defaultValue(PersistTo.NONE.toString())
.build();
public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor
.Builder().name("Replicate To")
.description("Durability constraint about replication.")
.required(true)
.allowableValues(ReplicateTo.values())
.defaultValue(ReplicateTo.NONE.toString())
.build();
@Override
protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
descriptors.add(DOCUMENT_TYPE);
descriptors.add(DOC_ID);
descriptors.add(DOC_ID_EXP);
descriptors.add(PERSIST_TO);
descriptors.add(REPLICATE_TO);
}
@Override
protected void addSupportedRelationships(Set<Relationship> relationships) {
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ProcessorLog logger = getLogger();
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
try {
final byte[] content = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, content, true);
}
});
String docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key()));
if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
docId = context.getProperty(DOC_ID).getValue();
} else if(!StringUtils.isEmpty(context.getProperty(DOC_ID_EXP).getValue())){
docId = context.getProperty(DOC_ID_EXP).evaluateAttributeExpressions(flowFile).getValue();
}
Document<?> doc = null;
DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
switch (documentType){
case Json : {
doc = RawJsonDocument.create(docId, new String(content, StandardCharsets.UTF_8));
break;
}
case Binary : {
ByteBuf buf = Unpooled.copiedBuffer(content);
doc = BinaryDocument.create(docId, buf);
break;
}
}
PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue());
ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue());
doc = openBucket(context).upsert(doc, persistTo, replicateTo);
Map<String, String> updatedAttrs = new HashMap<>();
updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue());
updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas()));
updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry()));
flowFile = session.putAllAttributes(flowFile, updatedAttrs);
session.getProvenanceReporter().send(flowFile, getTransitUrl(context));
session.transfer(flowFile, REL_SUCCESS);
} catch (Throwable t) {
logger.error("Writing {} into Couchbase Server failed due to {}", new Object[]{flowFile, t}, t);
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.couchbase.CouchbaseClusterService

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.couchbase.GetCouchbaseKey
org.apache.nifi.processors.couchbase.PutCouchbaseKey

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.couchbase;
import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
import org.apache.nifi.couchbase.CouchbaseClusterService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestCouchbaseClusterService {
private static final String SERVICE_ID = "couchbaseClusterService";
private TestRunner testRunner;
@Before
public void init() throws Exception {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.PutCouchbaseKey", "debug");
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.couchbase.CouchbaseClusterService", "debug");
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.couchbase.TestCouchbaseClusterService", "debug");
testRunner = TestRunners.newTestRunner(PutCouchbaseKey.class);
testRunner.setValidateExpressionUsage(false);
}
@Test
public void testConnectionFailure() throws InitializationException {
String connectionString = "couchbase://invalid-hostname";
CouchbaseClusterControllerService service = new CouchbaseClusterService();
testRunner.addControllerService(SERVICE_ID, service);
testRunner.setProperty(service, CouchbaseClusterService.CONNECTION_STRING, connectionString);
try {
testRunner.enableControllerService(service);
Assert.fail("The service shouldn't be enabled when it couldn't connect to a cluster.");
} catch (AssertionError e) {
}
}
}

View File

@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.couchbase;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.BUCKET_NAME;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.COUCHBASE_CLUSTER_SERVICE;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOCUMENT_TYPE;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID_EXP;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_ORIGINAL;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.couchbase.CouchbaseAttributes;
import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import com.couchbase.client.core.ServiceNotAvailableException;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.document.BinaryDocument;
import com.couchbase.client.java.document.RawJsonDocument;
public class TestGetCouchbaseKey {
private static final String SERVICE_ID = "couchbaseClusterService";
private TestRunner testRunner;
@Before
public void init() throws Exception {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.GetCouchbaseKey", "debug");
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.TestGetCouchbaseKey", "debug");
testRunner = TestRunners.newTestRunner(GetCouchbaseKey.class);
testRunner.setValidateExpressionUsage(false);
}
private void setupMockBucket(Bucket bucket) throws InitializationException {
CouchbaseClusterControllerService service = mock(CouchbaseClusterControllerService.class);
when(service.getIdentifier()).thenReturn(SERVICE_ID);
when(service.openBucket(anyString())).thenReturn(bucket);
testRunner.addControllerService(SERVICE_ID, service);
testRunner.enableControllerService(service);
testRunner.setProperty(COUCHBASE_CLUSTER_SERVICE, SERVICE_ID);
}
@Test
public void testStaticDocId() throws Exception {
String bucketName = "bucket-1";
String docId = "doc-a";
Bucket bucket = mock(Bucket.class);
String content = "{\"key\":\"value\"}";
int expiry = 100;
long cas = 200L;
when(bucket.get(docId, RawJsonDocument.class)).thenReturn(RawJsonDocument.create(docId, expiry, content, cas));
setupMockBucket(bucket);
testRunner.setProperty(BUCKET_NAME, bucketName);
testRunner.setProperty(DOC_ID, docId);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
outFile.assertAttributeEquals(CouchbaseAttributes.Cluster.key(), SERVICE_ID);
outFile.assertAttributeEquals(CouchbaseAttributes.Bucket.key(), bucketName);
outFile.assertAttributeEquals(CouchbaseAttributes.DocId.key(), docId);
outFile.assertAttributeEquals(CouchbaseAttributes.Cas.key(), String.valueOf(cas));
outFile.assertAttributeEquals(CouchbaseAttributes.Expiry.key(), String.valueOf(expiry));
}
/**
* Use static document id even if doc id expression is set.
*/
@Test
public void testStaticDocIdAndDocIdExp() throws Exception {
String docId = "doc-a";
String docIdExp = "${someProperty}";
Bucket bucket = mock(Bucket.class);
String content = "{\"key\":\"value\"}";
when(bucket.get(docId, RawJsonDocument.class)).thenReturn(RawJsonDocument.create(docId, content));
setupMockBucket(bucket);
testRunner.setProperty(DOC_ID, docId);
testRunner.setProperty(DOC_ID_EXP, docIdExp);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
}
@Test
public void testDocIdExp() throws Exception {
String docIdExp = "${'someProperty'}";
String somePropertyValue = "doc-p";
Bucket bucket = mock(Bucket.class);
String content = "{\"key\":\"value\"}";
when(bucket.get(somePropertyValue, RawJsonDocument.class))
.thenReturn(RawJsonDocument.create(somePropertyValue, content));
setupMockBucket(bucket);
testRunner.setProperty(DOC_ID_EXP, docIdExp);
byte[] inFileData = "input FlowFile data".getBytes(StandardCharsets.UTF_8);
Map<String, String> properties = new HashMap<>();
properties.put("someProperty", somePropertyValue);
testRunner.enqueue(inFileData, properties);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_ORIGINAL, 1);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
}
@Test
public void testInputFlowFileContent() throws Exception {
Bucket bucket = mock(Bucket.class);
String inFileDataStr = "doc-in";
String content = "{\"key\":\"value\"}";
when(bucket.get(inFileDataStr, RawJsonDocument.class))
.thenReturn(RawJsonDocument.create(inFileDataStr, content));
setupMockBucket(bucket);
byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8);
testRunner.enqueue(inFileData);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_ORIGINAL, 1);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
}
@Test
public void testBinaryDocument() throws Exception {
Bucket bucket = mock(Bucket.class);
String inFileDataStr = "doc-in";
String content = "binary";
ByteBuf buf = Unpooled.copiedBuffer(content.getBytes(StandardCharsets.UTF_8));
when(bucket.get(inFileDataStr, BinaryDocument.class))
.thenReturn(BinaryDocument.create(inFileDataStr, buf));
setupMockBucket(bucket);
byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8);
testRunner.enqueue(inFileData);
testRunner.setProperty(DOCUMENT_TYPE, DocumentType.Binary.toString());
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_ORIGINAL, 1);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
}
@Test
public void testCouchbaseFailure() throws Exception {
Bucket bucket = mock(Bucket.class);
String inFileDataStr = "doc-in";
when(bucket.get(inFileDataStr, RawJsonDocument.class))
.thenThrow(new ServiceNotAvailableException());
setupMockBucket(bucket);
byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8);
testRunner.enqueue(inFileData);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 0);
testRunner.assertTransferCount(REL_ORIGINAL, 0);
testRunner.assertTransferCount(REL_FAILURE, 1);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0);
outFile.assertContentEquals(inFileDataStr);
}
}

View File

@ -0,0 +1,254 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.couchbase;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.BUCKET_NAME;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.COUCHBASE_CLUSTER_SERVICE;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID_EXP;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE;
import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.couchbase.CouchbaseAttributes;
import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.PersistTo;
import com.couchbase.client.java.ReplicateTo;
import com.couchbase.client.java.document.RawJsonDocument;
import com.couchbase.client.java.error.DurabilityException;
public class TestPutCouchbaseKey {
private static final String SERVICE_ID = "couchbaseClusterService";
private TestRunner testRunner;
@Before
public void init() throws Exception {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.PutCouchbaseKey", "debug");
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.TestPutCouchbaseKey", "debug");
testRunner = TestRunners.newTestRunner(PutCouchbaseKey.class);
testRunner.setValidateExpressionUsage(false);
}
private void setupMockBucket(Bucket bucket) throws InitializationException {
CouchbaseClusterControllerService service = mock(CouchbaseClusterControllerService.class);
when(service.getIdentifier()).thenReturn(SERVICE_ID);
when(service.openBucket(anyString())).thenReturn(bucket);
testRunner.addControllerService(SERVICE_ID, service);
testRunner.enableControllerService(service);
testRunner.setProperty(COUCHBASE_CLUSTER_SERVICE, SERVICE_ID);
}
@Test
public void testStaticDocId() throws Exception {
String bucketName = "bucket-1";
String docId = "doc-a";
int expiry = 100;
long cas = 200L;
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)))
.thenReturn(RawJsonDocument.create(docId, expiry, inFileData, cas));
setupMockBucket(bucket);
testRunner.enqueue(inFileDataBytes);
testRunner.setProperty(BUCKET_NAME, bucketName);
testRunner.setProperty(DOC_ID, docId);
testRunner.run();
verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
outFile.assertAttributeEquals(CouchbaseAttributes.Cluster.key(), SERVICE_ID);
outFile.assertAttributeEquals(CouchbaseAttributes.Bucket.key(), bucketName);
outFile.assertAttributeEquals(CouchbaseAttributes.DocId.key(), docId);
outFile.assertAttributeEquals(CouchbaseAttributes.Cas.key(), String.valueOf(cas));
outFile.assertAttributeEquals(CouchbaseAttributes.Expiry.key(), String.valueOf(expiry));
}
@Test
public void testDurabilityConstraint() throws Exception {
String docId = "doc-a";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.MASTER), eq(ReplicateTo.ONE)))
.thenReturn(RawJsonDocument.create(docId, inFileData));
setupMockBucket(bucket);
testRunner.enqueue(inFileDataBytes);
testRunner.setProperty(DOC_ID, docId);
testRunner.setProperty(PutCouchbaseKey.PERSIST_TO, PersistTo.MASTER.toString());
testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString());
testRunner.run();
verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.MASTER), eq(ReplicateTo.ONE));
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
}
/**
* Use static document id even if doc id expression is set.
*/
@Test
public void testStaticDocIdAndDocIdExp() throws Exception {
String docId = "doc-a";
String docIdExp = "${someProperty}";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)))
.thenReturn(RawJsonDocument.create(docId, inFileData));
setupMockBucket(bucket);
testRunner.enqueue(inFileDataBytes);
testRunner.setProperty(DOC_ID, docId);
testRunner.setProperty(DOC_ID_EXP, docIdExp);
testRunner.run();
verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
}
@Test
public void testDocIdExp() throws Exception {
String docIdExp = "${'someProperty'}";
String somePropertyValue = "doc-p";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)))
.thenReturn(RawJsonDocument.create(somePropertyValue, inFileData));
setupMockBucket(bucket);
testRunner.setProperty(DOC_ID_EXP, docIdExp);
Map<String, String> properties = new HashMap<>();
properties.put("someProperty", somePropertyValue);
testRunner.enqueue(inFileDataBytes, properties);
testRunner.run();
verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
}
@Test
public void testInputFlowFileUuid() throws Exception {
String uuid = "00029362-5106-40e8-b8a9-bf2cecfbc0d7";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)))
.thenReturn(RawJsonDocument.create(uuid, inFileData));
setupMockBucket(bucket);
Map<String, String> properties = new HashMap<>();
properties.put(CoreAttributes.UUID.key(), uuid);
testRunner.enqueue(inFileDataBytes, properties);
testRunner.run();
ArgumentCaptor<RawJsonDocument> capture = ArgumentCaptor.forClass(RawJsonDocument.class);
verify(bucket, times(1)).upsert(capture.capture(), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
assertEquals(uuid, capture.getValue().id());
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
outFile.assertContentEquals(inFileData);
}
@Test
public void testCouchbaseFailure() throws Exception {
String docId = "doc-a";
String inFileData = "{\"key\":\"value\"}";
byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
Bucket bucket = mock(Bucket.class);
when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE)))
.thenThrow(new DurabilityException());
setupMockBucket(bucket);
testRunner.enqueue(inFileDataBytes);
testRunner.setProperty(DOC_ID, docId);
testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString());
testRunner.run();
verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE));
testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
testRunner.assertTransferCount(REL_SUCCESS, 0);
testRunner.assertTransferCount(REL_FAILURE, 1);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0);
outFile.assertContentEquals(inFileData);
}
}

View File

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.3.1-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-couchbase-bundle</artifactId>
<version>0.3.1-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-couchbase-processors</module>
<module>nifi-couchbase-nar</module>
</modules>
</project>

View File

@ -45,6 +45,7 @@
<module>nifi-ambari-bundle</module>
<module>nifi-image-bundle</module>
<module>nifi-avro-bundle</module>
<module>nifi-couchbase-bundle</module>
</modules>
<dependencyManagement>
<dependencies>

View File

@ -906,6 +906,12 @@
<version>0.3.1-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-couchbase-nar</artifactId>
<version>0.3.1-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>