From 2466a24530f493238024eb22dc041eebe96621f3 Mon Sep 17 00:00:00 2001 From: ijokarumawak Date: Sat, 26 Sep 2015 02:46:37 +0900 Subject: [PATCH] nifi-992 Adding nifi-couchbase-bundle. - new CouchbaseClusterControllerService - new Processors - GetCouchbaseKey - PutCouchbaseKey Signed-off-by: Bryan Bende --- nifi-assembly/NOTICE | 10 + nifi-assembly/pom.xml | 5 + .../nifi-couchbase-nar/pom.xml | 37 +++ .../nifi-couchbase-processors/pom.xml | 208 ++++++++++++++ .../nifi/couchbase/CouchbaseAttributes.java | 59 ++++ .../CouchbaseClusterControllerService.java | 38 +++ .../couchbase/CouchbaseClusterService.java | 130 +++++++++ .../couchbase/AbstractCouchbaseProcessor.java | 174 ++++++++++++ .../processors/couchbase/DocumentType.java | 36 +++ .../processors/couchbase/GetCouchbaseKey.java | 172 ++++++++++++ .../processors/couchbase/PutCouchbaseKey.java | 164 +++++++++++ ...g.apache.nifi.controller.ControllerService | 15 ++ .../org.apache.nifi.processor.Processor | 16 ++ .../TestCouchbaseClusterService.java | 59 ++++ .../couchbase/TestGetCouchbaseKey.java | 224 +++++++++++++++ .../couchbase/TestPutCouchbaseKey.java | 254 ++++++++++++++++++ .../nifi-couchbase-bundle/pom.xml | 35 +++ nifi-nar-bundles/pom.xml | 1 + pom.xml | 6 + 19 files changed, 1643 insertions(+) create mode 100644 nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java create mode 100644 nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java create mode 100644 nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java create mode 100644 nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java create mode 100644 nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/DocumentType.java create mode 100644 nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java create mode 100644 nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java create mode 100644 nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService create mode 100644 nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java create mode 100644 nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java create mode 100644 nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java create mode 100644 nifi-nar-bundles/nifi-couchbase-bundle/pom.xml diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index 3362740c1a..1f7e3f123a 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -709,6 +709,16 @@ The following binary components are provided under the Apache Software License v Metadata-Extractor Copyright 2002-2015 Drew Noakes + (ASLv2) Couchbase Java SDK + The following NOTICE information applies: + Couchbase Java SDK + Copyright 2014 Couchbase, Inc. + + (ASLv2) RxJava + The following NOTICE information applies: + Couchbase Java SDK + Copyright 2012 Netflix, Inc. + ************************ Common Development and Distribution License 1.1 ************************ diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index f162588f6e..de4fdcb4e5 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -227,6 +227,11 @@ language governing permissions and limitations under the License. --> nifi-image-nar nar + + org.apache.nifi + nifi-couchbase-nar + nar + diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml new file mode 100644 index 0000000000..4f58d1ff43 --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml @@ -0,0 +1,37 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-couchbase-bundle + 0.3.1-SNAPSHOT + + + nifi-couchbase-nar + 0.3.1-SNAPSHOT + nar + + + + org.apache.nifi + nifi-couchbase-processors + 0.3.1-SNAPSHOT + + + + diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml new file mode 100644 index 0000000000..33b0baa2cd --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml @@ -0,0 +1,208 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-couchbase-bundle + 0.3.1-SNAPSHOT + + + nifi-couchbase-processors + jar + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + + + org.apache.nifi + nifi-mock + test + + + org.slf4j + slf4j-simple + test + + + com.couchbase.client + java-client + 2.2.0 + + + junit + junit + 4.11 + test + + + org.apache.nifi + nifi-mock + test + + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.15 + + + com.puppycrawl.tools + checkstyle + 6.5 + + + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + warning + true + + + + + + + + contrib-check + + + + org.apache.rat + apache-rat-plugin + + + + check + + verify + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + check-style + + check + + + + + + + + + diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java new file mode 100644 index 0000000000..a4d69fc1af --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey; + +/** + * Couchbase related attribute keys. + */ +public enum CouchbaseAttributes implements FlowFileAttributeKey { + + /** + * A reference to the related cluster. + */ + Cluster("couchbase.cluster"), + /** + * A related bucket name. + */ + Bucket("couchbase.bucket"), + /** + * The id of a related document. + */ + DocId("couchbase.doc.id"), + /** + * The CAS value of a related document. + */ + Cas("couchbase.doc.cas"), + /** + * The expiration of a related document. + */ + Expiry("couchbase.doc.expiry"), + ; + + private final String key; + + private CouchbaseAttributes(final String key) { + this.key = key; + } + + @Override + public String key() { + return key; + } + +} diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java new file mode 100644 index 0000000000..fcf72d50ef --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.controller.ControllerService; + +import com.couchbase.client.java.Bucket; + +/** + * Provides a connection to a Couchbase Server cluster throughout a NiFi Data + * flow. + */ +@CapabilityDescription("Provides a centralized Couchbase connection.") +public interface CouchbaseClusterControllerService extends ControllerService { + + /** + * Open a bucket connection. + * @param bucketName the bucket name to access + * @return a connected bucket instance + */ + public Bucket openBucket(String bucketName); + +} diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java new file mode 100644 index 0000000000..7daa97c023 --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; + +import com.couchbase.client.core.CouchbaseException; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.CouchbaseCluster; + +/** + * Provides a centralized Couchbase connection and bucket passwords management. + */ +@CapabilityDescription("Provides a centralized Couchbase connection and bucket passwords management." + + " Bucket passwords can be specified via dynamic properties.") +@Tags({ "nosql", "couchbase", "database", "connection" }) +@DynamicProperty(name = "Bucket Password for BUCKET_NAME", value = "bucket password", description = "Specify bucket password if neseccery.") +public class CouchbaseClusterService extends AbstractControllerService implements CouchbaseClusterControllerService { + + public static final PropertyDescriptor CONNECTION_STRING = new PropertyDescriptor + .Builder().name("Connection String") + .description("The hostnames or ip addresses of the bootstraping nodes and optional parameters." + + " Syntax) couchbase://node1,node2,nodeN?param1=value1¶m2=value2¶mN=valueN") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final List properties; + + static { + final List props = new ArrayList<>(); + props.add(CONNECTION_STRING); + + properties = Collections.unmodifiableList(props); + } + + private static final String DYNAMIC_PROP_BUCKET_PASSWORD = "Bucket Password for "; + private static final Map bucketPasswords = new HashMap<>(); + + private volatile CouchbaseCluster cluster; + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor( + String propertyDescriptorName) { + if(propertyDescriptorName.startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)){ + return new PropertyDescriptor + .Builder().name(propertyDescriptorName) + .description("Bucket password.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .sensitive(true) + .build(); + } + return null; + } + + + /** + * Establish a connection to a Couchbase cluster. + * @param context the configuration context + * @throws InitializationException if unable to connect a Couchbase cluster + */ + @OnEnabled + public void onConfigured(final ConfigurationContext context) throws InitializationException { + + for(PropertyDescriptor p : context.getProperties().keySet()){ + if(p.isDynamic() && p.getName().startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)){ + String bucketName = p.getName().substring(DYNAMIC_PROP_BUCKET_PASSWORD.length()); + String password = context.getProperty(p).getValue(); + bucketPasswords.put(bucketName, password); + } + } + try { + cluster = CouchbaseCluster.fromConnectionString(context.getProperty(CONNECTION_STRING).getValue()); + } catch(CouchbaseException e) { + throw new InitializationException(e); + } + } + + @Override + public Bucket openBucket(String bucketName){ + return cluster.openBucket(bucketName, bucketPasswords.get(bucketName)); + } + + /** + * Disconnect from the Couchbase cluster. + */ + @OnDisabled + public void shutdown() { + if(cluster != null){ + cluster.disconnect(); + cluster = null; + } + } + +} diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java new file mode 100644 index 0000000000..d3707280fd --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import com.couchbase.client.java.Bucket; + +/** + * Provides common functionalities for Couchbase processors. + */ +public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { + + public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor + .Builder().name("Document Type") + .description("The type of contents.") + .required(true) + .allowableValues(DocumentType.values()) + .defaultValue(DocumentType.Json.toString()) + .build(); + + public static final PropertyDescriptor DOC_ID = new PropertyDescriptor + .Builder().name("Static Document Id") + .description("A static, fixed Couchbase document id.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DOC_ID_EXP = new PropertyDescriptor + .Builder().name("Document Id Expression") + .description("An expression to construct the Couchbase document id." + + " If 'Static Document Id' is specified, then 'Static Document Id' is used.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR) + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are written to Couchbase Server are routed to this relationship.") + .build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original input file will be routed to this destination when it has been successfully processed.") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("All FlowFiles that cannot written to Couchbase Server are routed to this relationship.") + .build(); + + public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor + .Builder().name("Couchbase Cluster Controller Service") + .description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.") + .required(true) + .identifiesControllerService(CouchbaseClusterControllerService.class) + .build(); + + public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor + .Builder().name("Bucket Name") + .description("The name of bucket to access.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("default") + .build(); + + private List descriptors; + + private Set relationships; + + private CouchbaseClusterControllerService clusterService; + + @Override + protected final void init(final ProcessorInitializationContext context) { + + final List descriptors = new ArrayList(); + descriptors.add(COUCHBASE_CLUSTER_SERVICE); + descriptors.add(BUCKET_NAME); + addSupportedProperties(descriptors); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet(); + addSupportedRelationships(relationships); + this.relationships = Collections.unmodifiableSet(relationships); + + } + + /** + * Add processor specific properties. + * @param descriptors add properties to this list + */ + protected void addSupportedProperties(List descriptors) { + return; + } + + /** + * Add processor specific relationships. + * @param relationships add relationships to this list + */ + protected void addSupportedRelationships(Set relationships) { + return; + } + + @Override + public final Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + private CouchbaseClusterControllerService getClusterService(final ProcessContext context) { + if(clusterService == null){ + synchronized(AbstractCouchbaseProcessor.class){ + if(clusterService == null){ + clusterService = context.getProperty(COUCHBASE_CLUSTER_SERVICE) + .asControllerService(CouchbaseClusterControllerService.class); + } + } + } + + return clusterService; + } + + /** + * Open a bucket connection using a CouchbaseClusterControllerService. + * @param context a process context + * @return a bucket instance + */ + protected final Bucket openBucket(final ProcessContext context) { + return getClusterService(context).openBucket(context.getProperty(BUCKET_NAME).getValue()); + } + + /** + * Generate a transit url. + * @param context a process context + * @return a transit url based on the bucket name and the CouchbaseClusterControllerService name + */ + protected String getTransitUrl(final ProcessContext context) { + return new StringBuilder(context.getProperty(BUCKET_NAME).getValue()) + .append('@') + .append(context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue()) + .toString(); + } + +} diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/DocumentType.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/DocumentType.java new file mode 100644 index 0000000000..81dd465d7d --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/DocumentType.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + + +/** + * Supported Couchbase document types. + * + * In order to handle a variety type of document classes such as JsonDocument, + * JsonLongDocument or JsonStringDocument, Couchbase processors use + * RawJsonDocument for Json type. + * + * The distinction between Json and Binary exists because BinaryDocument doesn't + * set Json flag when it stored on Couchbase Server even if the content byte + * array represents a Json string, and it can't be retrieved as a Json document. + */ +public enum DocumentType { + + Json, + Binary + +} diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java new file mode 100644 index 0000000000..6d9a476c19 --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.couchbase.CouchbaseAttributes; +import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.stream.io.StreamUtils; + +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.document.BinaryDocument; +import com.couchbase.client.java.document.Document; +import com.couchbase.client.java.document.RawJsonDocument; + +@Tags({ "nosql", "couchbase", "database", "get" }) +@CapabilityDescription("Get a document from Couchbase Server via Key/Value access.") +@SeeAlso({CouchbaseClusterControllerService.class}) +@ReadsAttributes({ + @ReadsAttribute(attribute = "FlowFile content", description = "Used as a document id if none of 'Static Document Id' or 'Document Id Expression' is specified"), + @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id Excepression.") + }) +@WritesAttributes({ + @WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was retrieved from."), + @WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was retrieved from."), + @WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."), + @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."), + @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document.") + }) +public class GetCouchbaseKey extends AbstractCouchbaseProcessor { + + @Override + protected void addSupportedProperties(List descriptors) { + descriptors.add(DOCUMENT_TYPE); + descriptors.add(DOC_ID); + descriptors.add(DOC_ID_EXP); + } + + @Override + protected void addSupportedRelationships(Set relationships) { + relationships.add(REL_SUCCESS); + relationships.add(REL_ORIGINAL); + relationships.add(REL_FAILURE); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final ProcessorLog logger = getLogger(); + FlowFile inFile = session.get(); + + String docId = null; + if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ + docId = context.getProperty(DOC_ID).getValue(); + } else { + // Otherwise docId has to be extracted from inFile. + if ( inFile == null ) { + return; + } + if(!StringUtils.isEmpty(context.getProperty(DOC_ID_EXP).getValue())){ + docId = context.getProperty(DOC_ID_EXP).evaluateAttributeExpressions(inFile).getValue(); + } else { + final byte[] content = new byte[(int) inFile.getSize()]; + session.read(inFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, content, true); + } + }); + docId = new String(content, StandardCharsets.UTF_8); + } + } + + if(StringUtils.isEmpty(docId)){ + logger.error("Couldn't get document id from from {}", new Object[]{inFile}); + session.transfer(inFile, REL_FAILURE); + } + + try { + Document doc = null; + byte[] content = null; + Bucket bucket = openBucket(context); + DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); + switch (documentType){ + case Json : { + RawJsonDocument document = bucket.get(docId, RawJsonDocument.class); + if(document != null){ + content = document.content().getBytes(StandardCharsets.UTF_8); + doc = document; + } + break; + } + case Binary : { + BinaryDocument document = bucket.get(docId, BinaryDocument.class); + if(document != null){ + content = document.content().array(); + doc = document; + } + break; + } + } + + if(doc == null) { + logger.info("Document {} was not found in {}", new Object[]{docId, getTransitUrl(context)}); + if(inFile != null){ + session.transfer(inFile, REL_FAILURE); + } + return; + } + + if(inFile != null){ + session.transfer(inFile, REL_ORIGINAL); + } + + FlowFile outFile = session.create(); + outFile = session.importFrom(new ByteArrayInputStream(content), outFile); + Map updatedAttrs = new HashMap<>(); + updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue()); + updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue()); + updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId); + updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas())); + updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry())); + outFile = session.putAllAttributes(outFile, updatedAttrs); + session.getProvenanceReporter().receive(outFile, getTransitUrl(context)); + session.transfer(outFile, REL_SUCCESS); + + } catch (Throwable t){ + logger.error("Getting docuement {} from Couchbase Server using {} failed due to {}", + new Object[]{docId, inFile, t}, t); + if(inFile != null){ + session.transfer(inFile, REL_FAILURE); + } + } + } + +} diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java new file mode 100644 index 0000000000..6bfa480c7e --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.couchbase.CouchbaseAttributes; +import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.stream.io.StreamUtils; + +import com.couchbase.client.deps.io.netty.buffer.ByteBuf; +import com.couchbase.client.deps.io.netty.buffer.Unpooled; +import com.couchbase.client.java.PersistTo; +import com.couchbase.client.java.ReplicateTo; +import com.couchbase.client.java.document.BinaryDocument; +import com.couchbase.client.java.document.Document; +import com.couchbase.client.java.document.RawJsonDocument; + +@Tags({ "nosql", "couchbase", "database", "put" }) +@CapabilityDescription("Put a document to Couchbase Server via Key/Value access.") +@SeeAlso({CouchbaseClusterControllerService.class}) +@ReadsAttributes({ + @ReadsAttribute(attribute = "uuid", description = "Used as a document id if none of 'Static Document Id' or 'Document Id Expression' is specified"), + @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id Excepression.") + }) +@WritesAttributes({ + @WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was stored."), + @WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was stored."), + @WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."), + @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."), + @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document.") + }) +public class PutCouchbaseKey extends AbstractCouchbaseProcessor { + + + public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor + .Builder().name("Persist To") + .description("Durability constraint about disk persistence.") + .required(true) + .allowableValues(PersistTo.values()) + .defaultValue(PersistTo.NONE.toString()) + .build(); + + public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor + .Builder().name("Replicate To") + .description("Durability constraint about replication.") + .required(true) + .allowableValues(ReplicateTo.values()) + .defaultValue(ReplicateTo.NONE.toString()) + .build(); + + @Override + protected void addSupportedProperties(List descriptors) { + descriptors.add(DOCUMENT_TYPE); + descriptors.add(DOC_ID); + descriptors.add(DOC_ID_EXP); + descriptors.add(PERSIST_TO); + descriptors.add(REPLICATE_TO); + } + + @Override + protected void addSupportedRelationships(Set relationships) { + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final ProcessorLog logger = getLogger(); + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + try { + + final byte[] content = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, content, true); + } + }); + + + String docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key())); + if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ + docId = context.getProperty(DOC_ID).getValue(); + } else if(!StringUtils.isEmpty(context.getProperty(DOC_ID_EXP).getValue())){ + docId = context.getProperty(DOC_ID_EXP).evaluateAttributeExpressions(flowFile).getValue(); + } + + Document doc = null; + DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); + switch (documentType){ + case Json : { + doc = RawJsonDocument.create(docId, new String(content, StandardCharsets.UTF_8)); + break; + } + case Binary : { + ByteBuf buf = Unpooled.copiedBuffer(content); + doc = BinaryDocument.create(docId, buf); + break; + } + } + + + PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue()); + ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue()); + doc = openBucket(context).upsert(doc, persistTo, replicateTo); + Map updatedAttrs = new HashMap<>(); + updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue()); + updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue()); + updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId); + updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas())); + updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry())); + flowFile = session.putAllAttributes(flowFile, updatedAttrs); + session.getProvenanceReporter().send(flowFile, getTransitUrl(context)); + session.transfer(flowFile, REL_SUCCESS); + + } catch (Throwable t) { + logger.error("Writing {} into Couchbase Server failed due to {}", new Object[]{flowFile, t}, t); + session.transfer(flowFile, REL_FAILURE); + } + } + +} diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000000..e5e3ea707f --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.couchbase.CouchbaseClusterService \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..1304435296 --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.couchbase.GetCouchbaseKey +org.apache.nifi.processors.couchbase.PutCouchbaseKey \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java new file mode 100644 index 0000000000..d96b1c29bc --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.couchbase.CouchbaseClusterService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +public class TestCouchbaseClusterService { + + private static final String SERVICE_ID = "couchbaseClusterService"; + private TestRunner testRunner; + + @Before + public void init() throws Exception { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.PutCouchbaseKey", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.couchbase.CouchbaseClusterService", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.couchbase.TestCouchbaseClusterService", "debug"); + + testRunner = TestRunners.newTestRunner(PutCouchbaseKey.class); + testRunner.setValidateExpressionUsage(false); + } + + @Test + public void testConnectionFailure() throws InitializationException { + String connectionString = "couchbase://invalid-hostname"; + CouchbaseClusterControllerService service = new CouchbaseClusterService(); + testRunner.addControllerService(SERVICE_ID, service); + testRunner.setProperty(service, CouchbaseClusterService.CONNECTION_STRING, connectionString); + try { + testRunner.enableControllerService(service); + Assert.fail("The service shouldn't be enabled when it couldn't connect to a cluster."); + } catch (AssertionError e) { + } + } + +} diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java new file mode 100644 index 0000000000..4ea4dffefd --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.BUCKET_NAME; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.COUCHBASE_CLUSTER_SERVICE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOCUMENT_TYPE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID_EXP; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_ORIGINAL; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.couchbase.CouchbaseAttributes; +import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import com.couchbase.client.core.ServiceNotAvailableException; +import com.couchbase.client.deps.io.netty.buffer.ByteBuf; +import com.couchbase.client.deps.io.netty.buffer.Unpooled; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.document.BinaryDocument; +import com.couchbase.client.java.document.RawJsonDocument; + + +public class TestGetCouchbaseKey { + + private static final String SERVICE_ID = "couchbaseClusterService"; + private TestRunner testRunner; + + @Before + public void init() throws Exception { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.GetCouchbaseKey", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.TestGetCouchbaseKey", "debug"); + + testRunner = TestRunners.newTestRunner(GetCouchbaseKey.class); + testRunner.setValidateExpressionUsage(false); + } + + private void setupMockBucket(Bucket bucket) throws InitializationException { + CouchbaseClusterControllerService service = mock(CouchbaseClusterControllerService.class); + when(service.getIdentifier()).thenReturn(SERVICE_ID); + when(service.openBucket(anyString())).thenReturn(bucket); + testRunner.addControllerService(SERVICE_ID, service); + testRunner.enableControllerService(service); + testRunner.setProperty(COUCHBASE_CLUSTER_SERVICE, SERVICE_ID); + } + + @Test + public void testStaticDocId() throws Exception { + String bucketName = "bucket-1"; + String docId = "doc-a"; + + Bucket bucket = mock(Bucket.class); + String content = "{\"key\":\"value\"}"; + int expiry = 100; + long cas = 200L; + when(bucket.get(docId, RawJsonDocument.class)).thenReturn(RawJsonDocument.create(docId, expiry, content, cas)); + setupMockBucket(bucket); + + testRunner.setProperty(BUCKET_NAME, bucketName); + testRunner.setProperty(DOC_ID, docId); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + + outFile.assertAttributeEquals(CouchbaseAttributes.Cluster.key(), SERVICE_ID); + outFile.assertAttributeEquals(CouchbaseAttributes.Bucket.key(), bucketName); + outFile.assertAttributeEquals(CouchbaseAttributes.DocId.key(), docId); + outFile.assertAttributeEquals(CouchbaseAttributes.Cas.key(), String.valueOf(cas)); + outFile.assertAttributeEquals(CouchbaseAttributes.Expiry.key(), String.valueOf(expiry)); + } + + + /** + * Use static document id even if doc id expression is set. + */ + @Test + public void testStaticDocIdAndDocIdExp() throws Exception { + String docId = "doc-a"; + String docIdExp = "${someProperty}"; + + Bucket bucket = mock(Bucket.class); + String content = "{\"key\":\"value\"}"; + when(bucket.get(docId, RawJsonDocument.class)).thenReturn(RawJsonDocument.create(docId, content)); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID, docId); + testRunner.setProperty(DOC_ID_EXP, docIdExp); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + } + + @Test + public void testDocIdExp() throws Exception { + String docIdExp = "${'someProperty'}"; + String somePropertyValue = "doc-p"; + + Bucket bucket = mock(Bucket.class); + String content = "{\"key\":\"value\"}"; + when(bucket.get(somePropertyValue, RawJsonDocument.class)) + .thenReturn(RawJsonDocument.create(somePropertyValue, content)); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID_EXP, docIdExp); + + byte[] inFileData = "input FlowFile data".getBytes(StandardCharsets.UTF_8); + Map properties = new HashMap<>(); + properties.put("someProperty", somePropertyValue); + testRunner.enqueue(inFileData, properties); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_ORIGINAL, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + } + + @Test + public void testInputFlowFileContent() throws Exception { + + Bucket bucket = mock(Bucket.class); + String inFileDataStr = "doc-in"; + String content = "{\"key\":\"value\"}"; + when(bucket.get(inFileDataStr, RawJsonDocument.class)) + .thenReturn(RawJsonDocument.create(inFileDataStr, content)); + setupMockBucket(bucket); + + + byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_ORIGINAL, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + } + + @Test + public void testBinaryDocument() throws Exception { + + Bucket bucket = mock(Bucket.class); + String inFileDataStr = "doc-in"; + String content = "binary"; + ByteBuf buf = Unpooled.copiedBuffer(content.getBytes(StandardCharsets.UTF_8)); + when(bucket.get(inFileDataStr, BinaryDocument.class)) + .thenReturn(BinaryDocument.create(inFileDataStr, buf)); + setupMockBucket(bucket); + + + byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + testRunner.setProperty(DOCUMENT_TYPE, DocumentType.Binary.toString()); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_ORIGINAL, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + } + + + @Test + public void testCouchbaseFailure() throws Exception { + + Bucket bucket = mock(Bucket.class); + String inFileDataStr = "doc-in"; + when(bucket.get(inFileDataStr, RawJsonDocument.class)) + .thenThrow(new ServiceNotAvailableException()); + setupMockBucket(bucket); + + + byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_FAILURE, 1); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0); + outFile.assertContentEquals(inFileDataStr); + } +} diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java new file mode 100644 index 0000000000..39955286f7 --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.BUCKET_NAME; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.COUCHBASE_CLUSTER_SERVICE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID_EXP; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.couchbase.CouchbaseAttributes; +import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.PersistTo; +import com.couchbase.client.java.ReplicateTo; +import com.couchbase.client.java.document.RawJsonDocument; +import com.couchbase.client.java.error.DurabilityException; + + +public class TestPutCouchbaseKey { + + private static final String SERVICE_ID = "couchbaseClusterService"; + private TestRunner testRunner; + + @Before + public void init() throws Exception { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.PutCouchbaseKey", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.TestPutCouchbaseKey", "debug"); + + testRunner = TestRunners.newTestRunner(PutCouchbaseKey.class); + testRunner.setValidateExpressionUsage(false); + } + + private void setupMockBucket(Bucket bucket) throws InitializationException { + CouchbaseClusterControllerService service = mock(CouchbaseClusterControllerService.class); + when(service.getIdentifier()).thenReturn(SERVICE_ID); + when(service.openBucket(anyString())).thenReturn(bucket); + testRunner.addControllerService(SERVICE_ID, service); + testRunner.enableControllerService(service); + testRunner.setProperty(COUCHBASE_CLUSTER_SERVICE, SERVICE_ID); + } + + @Test + public void testStaticDocId() throws Exception { + String bucketName = "bucket-1"; + String docId = "doc-a"; + int expiry = 100; + long cas = 200L; + + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE))) + .thenReturn(RawJsonDocument.create(docId, expiry, inFileData, cas)); + setupMockBucket(bucket); + + testRunner.enqueue(inFileDataBytes); + testRunner.setProperty(BUCKET_NAME, bucketName); + testRunner.setProperty(DOC_ID, docId); + testRunner.run(); + + verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(inFileData); + outFile.assertAttributeEquals(CouchbaseAttributes.Cluster.key(), SERVICE_ID); + outFile.assertAttributeEquals(CouchbaseAttributes.Bucket.key(), bucketName); + outFile.assertAttributeEquals(CouchbaseAttributes.DocId.key(), docId); + outFile.assertAttributeEquals(CouchbaseAttributes.Cas.key(), String.valueOf(cas)); + outFile.assertAttributeEquals(CouchbaseAttributes.Expiry.key(), String.valueOf(expiry)); + } + + @Test + public void testDurabilityConstraint() throws Exception { + String docId = "doc-a"; + + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.MASTER), eq(ReplicateTo.ONE))) + .thenReturn(RawJsonDocument.create(docId, inFileData)); + setupMockBucket(bucket); + + testRunner.enqueue(inFileDataBytes); + testRunner.setProperty(DOC_ID, docId); + testRunner.setProperty(PutCouchbaseKey.PERSIST_TO, PersistTo.MASTER.toString()); + testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString()); + testRunner.run(); + + verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.MASTER), eq(ReplicateTo.ONE)); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(inFileData); + } + + /** + * Use static document id even if doc id expression is set. + */ + @Test + public void testStaticDocIdAndDocIdExp() throws Exception { + String docId = "doc-a"; + String docIdExp = "${someProperty}"; + + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE))) + .thenReturn(RawJsonDocument.create(docId, inFileData)); + setupMockBucket(bucket); + + testRunner.enqueue(inFileDataBytes); + testRunner.setProperty(DOC_ID, docId); + testRunner.setProperty(DOC_ID_EXP, docIdExp); + testRunner.run(); + + verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(inFileData); + } + + @Test + public void testDocIdExp() throws Exception { + String docIdExp = "${'someProperty'}"; + String somePropertyValue = "doc-p"; + + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE))) + .thenReturn(RawJsonDocument.create(somePropertyValue, inFileData)); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID_EXP, docIdExp); + + Map properties = new HashMap<>(); + properties.put("someProperty", somePropertyValue); + testRunner.enqueue(inFileDataBytes, properties); + testRunner.run(); + + verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(inFileData); + } + + @Test + public void testInputFlowFileUuid() throws Exception { + + String uuid = "00029362-5106-40e8-b8a9-bf2cecfbc0d7"; + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE))) + .thenReturn(RawJsonDocument.create(uuid, inFileData)); + setupMockBucket(bucket); + + Map properties = new HashMap<>(); + properties.put(CoreAttributes.UUID.key(), uuid); + testRunner.enqueue(inFileDataBytes, properties); + testRunner.run(); + + ArgumentCaptor capture = ArgumentCaptor.forClass(RawJsonDocument.class); + verify(bucket, times(1)).upsert(capture.capture(), eq(PersistTo.NONE), eq(ReplicateTo.NONE)); + assertEquals(uuid, capture.getValue().id()); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(inFileData); + } + + + @Test + public void testCouchbaseFailure() throws Exception { + + String docId = "doc-a"; + + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE))) + .thenThrow(new DurabilityException()); + setupMockBucket(bucket); + + testRunner.enqueue(inFileDataBytes); + testRunner.setProperty(DOC_ID, docId); + testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString()); + testRunner.run(); + + verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE)); + + testRunner.assertAllFlowFilesTransferred(REL_FAILURE); + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_FAILURE, 1); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0); + outFile.assertContentEquals(inFileData); + } +} diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/pom.xml b/nifi-nar-bundles/nifi-couchbase-bundle/pom.xml new file mode 100644 index 0000000000..36542955b0 --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/pom.xml @@ -0,0 +1,35 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-nar-bundles + 0.3.1-SNAPSHOT + + + org.apache.nifi + nifi-couchbase-bundle + 0.3.1-SNAPSHOT + pom + + + nifi-couchbase-processors + nifi-couchbase-nar + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index d51c9b6901..841818aeaf 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -45,6 +45,7 @@ nifi-ambari-bundle nifi-image-bundle nifi-avro-bundle + nifi-couchbase-bundle diff --git a/pom.xml b/pom.xml index 7f68c329d2..1d5a8570dd 100644 --- a/pom.xml +++ b/pom.xml @@ -906,6 +906,12 @@ 0.3.1-SNAPSHOT nar + + org.apache.nifi + nifi-couchbase-nar + 0.3.1-SNAPSHOT + nar + org.apache.nifi nifi-properties