NIFI-1109 Updated documentation and cleaned up code

Reviewed by Bryan Bende (bbende@apache.org).
Committed with amendments (for whitespace, a couple misspellings and to change a test method name based on review) by Tony Kurc (tkurc@apache.org)
This commit is contained in:
Mark Payne 2015-11-12 21:01:50 -05:00 committed by Tony Kurc
parent 02102ea1c2
commit 65bd8c0b1f
5 changed files with 168 additions and 150 deletions

View File

@ -43,18 +43,15 @@ import com.couchbase.client.java.Bucket;
*/
public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor
.Builder().name("Document Type")
public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor.Builder().name("Document Type")
.description("The type of contents.")
.required(true)
.allowableValues(DocumentType.values())
.defaultValue(DocumentType.Json.toString())
.build();
public static final PropertyDescriptor DOC_ID = new PropertyDescriptor
.Builder().name("Document Id")
.description("A static, fixed Couchbase document id."
+ "Or an expression to construct the Couchbase document id.")
public static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder().name("Document Id")
.description("A static, fixed Couchbase document id, or an expression to construct the Couchbase document id.")
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@ -77,15 +74,13 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
.description("All FlowFiles that cannot written to Couchbase Server and can't be retried are routed to this relationship.")
.build();
public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor
.Builder().name("Couchbase Cluster Controller Service")
public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor.Builder().name("Couchbase Cluster Controller Service")
.description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.")
.required(true)
.identifiesControllerService(CouchbaseClusterControllerService.class)
.build();
public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor
.Builder().name("Bucket Name")
public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder().name("Bucket Name")
.description("The name of bucket to access.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@ -115,6 +110,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
/**
* Add processor specific properties.
*
* @param descriptors add properties to this list
*/
protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
@ -123,6 +119,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
/**
* Add processor specific relationships.
*
* @param relationships add relationships to this list
*/
protected void addSupportedRelationships(Set<Relationship> relationships) {
@ -154,6 +151,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
/**
* Open a bucket connection using a CouchbaseClusterControllerService.
*
* @param context a process context
* @return a bucket instance
*/
@ -163,18 +161,17 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
/**
* Generate a transit url.
*
* @param context a process context
* @return a transit url based on the bucket name and the CouchbaseClusterControllerService name
*/
protected String getTransitUrl(final ProcessContext context) {
return new StringBuilder(context.getProperty(BUCKET_NAME).getValue())
.append('@')
.append(context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue())
.toString();
protected String getTransitUrl(final ProcessContext context, final String docId) {
return "couchbase://" + context.getProperty(BUCKET_NAME).getValue() + "/" + docId;
}
/**
* Handles the thrown CocuhbaseException accordingly.
* Handles the thrown CouchbaseException accordingly.
*
* @param context a process context
* @param session a process session
* @param logger a logger
@ -190,11 +187,15 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
ErrorHandlingStrategy strategy = CouchbaseExceptionMappings.getStrategy(e);
switch (strategy.penalty()) {
case Penalize:
if(logger.isDebugEnabled()) logger.debug("Penalized: {}", new Object[]{inFile});
if (logger.isDebugEnabled()) {
logger.debug("Penalized: {}", new Object[] {inFile});
}
inFile = session.penalize(inFile);
break;
case Yield:
if(logger.isDebugEnabled()) logger.debug("Yielded context: {}", new Object[]{inFile});
if (logger.isDebugEnabled()) {
logger.debug("Yielded context: {}", new Object[] {inFile});
}
context.yield();
break;
case None:

View File

@ -16,18 +16,19 @@
*/
package org.apache.nifi.processors.couchbase;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -43,6 +44,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;
import com.couchbase.client.core.CouchbaseException;
@ -53,12 +55,11 @@ import com.couchbase.client.java.document.RawJsonDocument;
import com.couchbase.client.java.error.DocumentDoesNotExistException;
@Tags({"nosql", "couchbase", "database", "get"})
@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. This processor can be triggered by an incoming FlowFile, or it can be scheduled on a timer")
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. The ID of the document to fetch may be supplied by setting the <Document Id> property. "
+ "NOTE: if the Document Id property is not set, the contents of the FlowFile will be read to determine the Document Id, which means that the contents of the entire "
+ "FlowFile will be buffered in memory.")
@SeeAlso({CouchbaseClusterControllerService.class})
@ReadsAttributes({
@ReadsAttribute(attribute = "FlowFile content", description = "Used as a document id if 'Document Id' is not specified"),
@ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.")
})
@WritesAttributes({
@WritesAttribute(attribute = "couchbase.cluster", description = "Cluster where the document was retrieved from."),
@WritesAttribute(attribute = "couchbase.bucket", description = "Bucket where the document was retrieved from."),
@ -70,13 +71,13 @@ import com.couchbase.client.java.error.DocumentDoesNotExistException;
public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
@Override
protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
protected void addSupportedProperties(final List<PropertyDescriptor> descriptors) {
descriptors.add(DOCUMENT_TYPE);
descriptors.add(DOC_ID);
}
@Override
protected void addSupportedRelationships(Set<Relationship> relationships) {
protected void addSupportedRelationships(final Set<Relationship> relationships) {
relationships.add(REL_SUCCESS);
relationships.add(REL_ORIGINAL);
relationships.add(REL_RETRY);
@ -85,9 +86,13 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ProcessorLog logger = getLogger();
FlowFile inFile = session.get();
if (inFile == null) {
return;
}
final long startNanos = System.nanoTime();
final ProcessorLog logger = getLogger();
String docId = null;
if (!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())) {
docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue();
@ -103,20 +108,22 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
}
if (StringUtils.isEmpty(docId)) {
if(inFile != null){
throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile);
}
}
try {
Document<?> doc = null;
byte[] content = null;
Bucket bucket = openBucket(context);
DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
final Document<?> doc;
final byte[] content;
final Bucket bucket = openBucket(context);
final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
switch (documentType) {
case Json: {
RawJsonDocument document = bucket.get(docId, RawJsonDocument.class);
if(document != null){
if (document == null) {
doc = null;
content = null;
} else {
content = document.content().getBytes(StandardCharsets.UTF_8);
doc = document;
}
@ -124,41 +131,50 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
}
case Binary: {
BinaryDocument document = bucket.get(docId, BinaryDocument.class);
if(document != null){
if (document == null) {
doc = null;
content = null;
} else {
content = document.content().array();
doc = document;
}
break;
}
default: {
doc = null;
content = null;
}
}
if (doc == null) {
logger.warn("Document {} was not found in {}", new Object[]{docId, getTransitUrl(context)});
if(inFile != null){
logger.error("Document {} was not found in {}; routing {} to failure", new Object[] {docId, getTransitUrl(context, docId), inFile});
inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentDoesNotExistException.class.getName());
session.transfer(inFile, REL_FAILURE);
}
return;
}
if(inFile != null){
session.transfer(inFile, REL_ORIGINAL);
FlowFile outFile = session.create(inFile);
outFile = session.write(outFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(content);
}
});
FlowFile outFile = session.create();
outFile = session.importFrom(new ByteArrayInputStream(content), outFile);
Map<String, String> updatedAttrs = new HashMap<>();
final Map<String, String> updatedAttrs = new HashMap<>();
updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue());
updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas()));
updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry()));
outFile = session.putAllAttributes(outFile, updatedAttrs);
session.getProvenanceReporter().receive(outFile, getTransitUrl(context));
session.transfer(outFile, REL_SUCCESS);
} catch (CouchbaseException e){
String errMsg = String.format("Getting docuement %s from Couchbase Server using %s failed due to %s", docId, inFile, e);
final long fetchMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().fetch(outFile, getTransitUrl(context, docId), fetchMillis);
session.transfer(outFile, REL_SUCCESS);
session.transfer(inFile, REL_ORIGINAL);
} catch (final CouchbaseException e) {
String errMsg = String.format("Getting document %s from Couchbase Server using %s failed due to %s", docId, inFile, e);
handleCouchbaseException(context, session, logger, inFile, e, errMsg);
}
}

View File

@ -59,7 +59,6 @@ import com.couchbase.client.java.document.RawJsonDocument;
@SeeAlso({CouchbaseClusterControllerService.class})
@ReadsAttributes({
@ReadsAttribute(attribute = "uuid", description = "Used as a document id if 'Document Id' is not specified"),
@ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.")
})
@WritesAttributes({
@WritesAttribute(attribute = "couchbase.cluster", description = "Cluster where the document was stored."),
@ -72,16 +71,14 @@ import com.couchbase.client.java.document.RawJsonDocument;
public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor
.Builder().name("Persist To")
public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor.Builder().name("Persist To")
.description("Durability constraint about disk persistence.")
.required(true)
.allowableValues(PersistTo.values())
.defaultValue(PersistTo.NONE.toString())
.build();
public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor
.Builder().name("Replicate To")
public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor.Builder().name("Replicate To")
.description("Durability constraint about replication.")
.required(true)
.allowableValues(ReplicateTo.values())
@ -119,41 +116,42 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
}
});
String docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key()));
String docId = flowFile.getAttribute(CoreAttributes.UUID.key());
if (!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())) {
docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
}
try {
Document<?> doc = null;
DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
switch (documentType) {
case Json: {
doc = RawJsonDocument.create(docId, new String(content, StandardCharsets.UTF_8));
break;
}
case Binary: {
ByteBuf buf = Unpooled.copiedBuffer(content);
final ByteBuf buf = Unpooled.copiedBuffer(content);
doc = BinaryDocument.create(docId, buf);
break;
}
}
PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue());
ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue());
final PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue());
final ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue());
doc = openBucket(context).upsert(doc, persistTo, replicateTo);
Map<String, String> updatedAttrs = new HashMap<>();
final Map<String, String> updatedAttrs = new HashMap<>();
updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue());
updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas()));
updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry()));
flowFile = session.putAllAttributes(flowFile, updatedAttrs);
session.getProvenanceReporter().send(flowFile, getTransitUrl(context));
session.transfer(flowFile, REL_SUCCESS);
} catch (CouchbaseException e) {
String errMsg = String.format("Writing docuement %s to Couchbase Server using %s failed due to %s", docId, flowFile, e);
flowFile = session.putAllAttributes(flowFile, updatedAttrs);
session.getProvenanceReporter().send(flowFile, getTransitUrl(context, docId));
session.transfer(flowFile, REL_SUCCESS);
} catch (final CouchbaseException e) {
String errMsg = String.format("Writing document %s to Couchbase Server using %s failed due to %s", docId, flowFile, e);
handleCouchbaseException(context, session, logger, flowFile, e, errMsg);
}
}

View File

@ -100,10 +100,11 @@ public class TestGetCouchbaseKey {
testRunner.setProperty(BUCKET_NAME, bucketName);
testRunner.setProperty(DOC_ID, docId);
testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_ORIGINAL, 1);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
@ -145,7 +146,7 @@ public class TestGetCouchbaseKey {
}
@Test
public void testDocIdExpWithNullFlowFile() throws Exception {
public void testDocIdExpWithEmptyFlowFile() throws Exception {
String docIdExp = "doc-s";
String docId = "doc-s";
@ -157,10 +158,11 @@ public class TestGetCouchbaseKey {
testRunner.setProperty(DOC_ID, docIdExp);
testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_ORIGINAL, 0);
testRunner.assertTransferCount(REL_ORIGINAL, 1);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
@ -179,6 +181,7 @@ public class TestGetCouchbaseKey {
setupMockBucket(bucket);
testRunner.setProperty(DOC_ID, docIdExp);
testRunner.enqueue(new byte[0]);
try {
testRunner.run();