Merge branch 'master' into NIFI-1156

This commit is contained in:
Jeremy Dyer 2015-11-13 15:02:23 -05:00
commit f8d26d2059
19 changed files with 1272 additions and 692 deletions

View File

@ -31,7 +31,12 @@ public interface FlowFile extends Comparable<FlowFile> {
/**
* @return the unique identifier for this flow file
* @deprecated This method has been deprecated in favor of using the attribute
* {@link org.apache.nifi.flowfile.attributes.CoreAttributes.UUID CoreAttributes.UUID}.
* If an identifier is needed use {@link #getAttribute(String)} to retrieve the value for this attribute.
* For example, by calling getAttribute(CoreAttributes.UUID.getKey()).
*/
@Deprecated
long getId();
/**

View File

@ -43,21 +43,18 @@ import com.couchbase.client.java.Bucket;
*/
public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor
.Builder().name("Document Type")
.description("The type of contents.")
.required(true)
.allowableValues(DocumentType.values())
.defaultValue(DocumentType.Json.toString())
.build();
public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor.Builder().name("Document Type")
.description("The type of contents.")
.required(true)
.allowableValues(DocumentType.values())
.defaultValue(DocumentType.Json.toString())
.build();
public static final PropertyDescriptor DOC_ID = new PropertyDescriptor
.Builder().name("Document Id")
.description("A static, fixed Couchbase document id."
+ "Or an expression to construct the Couchbase document id.")
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder().name("Document Id")
.description("A static, fixed Couchbase document id, or an expression to construct the Couchbase document id.")
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
@ -77,15 +74,13 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
.description("All FlowFiles that cannot written to Couchbase Server and can't be retried are routed to this relationship.")
.build();
public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor
.Builder().name("Couchbase Cluster Controller Service")
public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor.Builder().name("Couchbase Cluster Controller Service")
.description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.")
.required(true)
.identifiesControllerService(CouchbaseClusterControllerService.class)
.build();
public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor
.Builder().name("Bucket Name")
public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder().name("Bucket Name")
.description("The name of bucket to access.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@ -115,6 +110,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
/**
* Add processor specific properties.
*
* @param descriptors add properties to this list
*/
protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
@ -123,6 +119,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
/**
* Add processor specific relationships.
*
* @param relationships add relationships to this list
*/
protected void addSupportedRelationships(Set<Relationship> relationships) {
@ -140,11 +137,11 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
}
private CouchbaseClusterControllerService getClusterService(final ProcessContext context) {
if(clusterService == null){
synchronized(AbstractCouchbaseProcessor.class){
if(clusterService == null){
if (clusterService == null) {
synchronized (AbstractCouchbaseProcessor.class) {
if (clusterService == null) {
clusterService = context.getProperty(COUCHBASE_CLUSTER_SERVICE)
.asControllerService(CouchbaseClusterControllerService.class);
.asControllerService(CouchbaseClusterControllerService.class);
}
}
}
@ -154,6 +151,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
/**
* Open a bucket connection using a CouchbaseClusterControllerService.
*
* @param context a process context
* @return a bucket instance
*/
@ -163,18 +161,17 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
/**
* Generate a transit url.
*
* @param context a process context
* @return a transit url based on the bucket name and the CouchbaseClusterControllerService name
*/
protected String getTransitUrl(final ProcessContext context) {
return new StringBuilder(context.getProperty(BUCKET_NAME).getValue())
.append('@')
.append(context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue())
.toString();
protected String getTransitUrl(final ProcessContext context, final String docId) {
return "couchbase://" + context.getProperty(BUCKET_NAME).getValue() + "/" + docId;
}
/**
* Handles the thrown CocuhbaseException accordingly.
* Handles the thrown CouchbaseException accordingly.
*
* @param context a process context
* @param session a process session
* @param logger a logger
@ -183,35 +180,39 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
* @param errMsg a message to be logged
*/
protected void handleCouchbaseException(final ProcessContext context, final ProcessSession session,
final ProcessorLog logger, FlowFile inFile, CouchbaseException e,
String errMsg) {
final ProcessorLog logger, FlowFile inFile, CouchbaseException e,
String errMsg) {
logger.error(errMsg, e);
if(inFile != null){
if (inFile != null) {
ErrorHandlingStrategy strategy = CouchbaseExceptionMappings.getStrategy(e);
switch(strategy.penalty()) {
case Penalize:
if(logger.isDebugEnabled()) logger.debug("Penalized: {}", new Object[]{inFile});
inFile = session.penalize(inFile);
break;
case Yield:
if(logger.isDebugEnabled()) logger.debug("Yielded context: {}", new Object[]{inFile});
context.yield();
break;
case None:
break;
switch (strategy.penalty()) {
case Penalize:
if (logger.isDebugEnabled()) {
logger.debug("Penalized: {}", new Object[] {inFile});
}
inFile = session.penalize(inFile);
break;
case Yield:
if (logger.isDebugEnabled()) {
logger.debug("Yielded context: {}", new Object[] {inFile});
}
context.yield();
break;
case None:
break;
}
switch(strategy.result()) {
case ProcessException:
throw new ProcessException(errMsg, e);
case Failure:
inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
session.transfer(inFile, REL_FAILURE);
break;
case Retry:
inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
session.transfer(inFile, REL_RETRY);
break;
switch (strategy.result()) {
case ProcessException:
throw new ProcessException(errMsg, e);
case Failure:
inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
session.transfer(inFile, REL_FAILURE);
break;
case Retry:
inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName());
session.transfer(inFile, REL_RETRY);
break;
}
}
}

View File

@ -78,7 +78,7 @@ public class CouchbaseExceptionMappings {
mapping.put(ReplicaNotConfiguredException.class, ConfigurationError);
// when a particular Service(KV, View, Query, DCP) isn't running in a cluster
mapping.put(ServiceNotAvailableException.class, ConfigurationError);
// SSL configuration error, such as key store mis configuration.
// SSL configuration error, such as key store misconfiguration.
mapping.put(SSLException.class, ConfigurationError);
/*

View File

@ -16,18 +16,19 @@
*/
package org.apache.nifi.processors.couchbase;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -43,6 +44,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;
import com.couchbase.client.core.CouchbaseException;
@ -52,31 +54,30 @@ import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.RawJsonDocument;
import com.couchbase.client.java.error.DocumentDoesNotExistException;
@Tags({ "nosql", "couchbase", "database", "get" })
@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. This processor can be triggered by an incoming FlowFile, or it can be scheduled on a timer")
@Tags({"nosql", "couchbase", "database", "get"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. The ID of the document to fetch may be supplied by setting the <Document Id> property. "
+ "NOTE: if the Document Id property is not set, the contents of the FlowFile will be read to determine the Document Id, which means that the contents of the entire "
+ "FlowFile will be buffered in memory.")
@SeeAlso({CouchbaseClusterControllerService.class})
@ReadsAttributes({
@ReadsAttribute(attribute = "FlowFile content", description = "Used as a document id if 'Document Id' is not specified"),
@ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.")
})
@WritesAttributes({
@WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was retrieved from."),
@WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was retrieved from."),
@WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."),
@WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."),
@WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."),
@WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.")
})
@WritesAttribute(attribute = "couchbase.cluster", description = "Cluster where the document was retrieved from."),
@WritesAttribute(attribute = "couchbase.bucket", description = "Bucket where the document was retrieved from."),
@WritesAttribute(attribute = "couchbase.doc.id", description = "Id of the document."),
@WritesAttribute(attribute = "couchbase.doc.cas", description = "CAS of the document."),
@WritesAttribute(attribute = "couchbase.doc.expiry", description = "Expiration of the document."),
@WritesAttribute(attribute = "couchbase.exception", description = "If Couchbase related error occurs the CouchbaseException class name will be captured here.")
})
public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
@Override
protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
protected void addSupportedProperties(final List<PropertyDescriptor> descriptors) {
descriptors.add(DOCUMENT_TYPE);
descriptors.add(DOC_ID);
}
@Override
protected void addSupportedRelationships(Set<Relationship> relationships) {
protected void addSupportedRelationships(final Set<Relationship> relationships) {
relationships.add(REL_SUCCESS);
relationships.add(REL_ORIGINAL);
relationships.add(REL_RETRY);
@ -85,11 +86,15 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ProcessorLog logger = getLogger();
FlowFile inFile = session.get();
if (inFile == null) {
return;
}
final long startNanos = System.nanoTime();
final ProcessorLog logger = getLogger();
String docId = null;
if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
if (!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())) {
docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue();
} else {
final byte[] content = new byte[(int) inFile.getSize()];
@ -102,63 +107,74 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
docId = new String(content, StandardCharsets.UTF_8);
}
if(StringUtils.isEmpty(docId)){
if(inFile != null){
throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile);
}
if (StringUtils.isEmpty(docId)) {
throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile);
}
try {
Document<?> doc = null;
byte[] content = null;
Bucket bucket = openBucket(context);
DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
switch (documentType){
case Json : {
final Document<?> doc;
final byte[] content;
final Bucket bucket = openBucket(context);
final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
switch (documentType) {
case Json: {
RawJsonDocument document = bucket.get(docId, RawJsonDocument.class);
if(document != null){
if (document == null) {
doc = null;
content = null;
} else {
content = document.content().getBytes(StandardCharsets.UTF_8);
doc = document;
}
break;
}
case Binary : {
case Binary: {
BinaryDocument document = bucket.get(docId, BinaryDocument.class);
if(document != null){
if (document == null) {
doc = null;
content = null;
} else {
content = document.content().array();
doc = document;
}
break;
}
default: {
doc = null;
content = null;
}
}
if(doc == null) {
logger.warn("Document {} was not found in {}", new Object[]{docId, getTransitUrl(context)});
if(inFile != null){
inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentDoesNotExistException.class.getName());
session.transfer(inFile, REL_FAILURE);
}
if (doc == null) {
logger.error("Document {} was not found in {}; routing {} to failure", new Object[] {docId, getTransitUrl(context, docId), inFile});
inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentDoesNotExistException.class.getName());
session.transfer(inFile, REL_FAILURE);
return;
}
if(inFile != null){
session.transfer(inFile, REL_ORIGINAL);
}
FlowFile outFile = session.create(inFile);
outFile = session.write(outFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(content);
}
});
FlowFile outFile = session.create();
outFile = session.importFrom(new ByteArrayInputStream(content), outFile);
Map<String, String> updatedAttrs = new HashMap<>();
final Map<String, String> updatedAttrs = new HashMap<>();
updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue());
updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas()));
updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry()));
outFile = session.putAllAttributes(outFile, updatedAttrs);
session.getProvenanceReporter().receive(outFile, getTransitUrl(context));
session.transfer(outFile, REL_SUCCESS);
} catch (CouchbaseException e){
String errMsg = String.format("Getting docuement %s from Couchbase Server using %s failed due to %s", docId, inFile, e);
final long fetchMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().fetch(outFile, getTransitUrl(context, docId), fetchMillis);
session.transfer(outFile, REL_SUCCESS);
session.transfer(inFile, REL_ORIGINAL);
} catch (final CouchbaseException e) {
String errMsg = String.format("Getting document %s from Couchbase Server using %s failed due to %s", docId, inFile, e);
handleCouchbaseException(context, session, logger, inFile, e, errMsg);
}
}

View File

@ -54,39 +54,36 @@ import com.couchbase.client.java.document.BinaryDocument;
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.RawJsonDocument;
@Tags({ "nosql", "couchbase", "database", "put" })
@Tags({"nosql", "couchbase", "database", "put"})
@CapabilityDescription("Put a document to Couchbase Server via Key/Value access.")
@SeeAlso({CouchbaseClusterControllerService.class})
@ReadsAttributes({
@ReadsAttribute(attribute = "uuid", description = "Used as a document id if 'Document Id' is not specified"),
@ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.")
})
})
@WritesAttributes({
@WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was stored."),
@WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was stored."),
@WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."),
@WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."),
@WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."),
@WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.")
})
@WritesAttribute(attribute = "couchbase.cluster", description = "Cluster where the document was stored."),
@WritesAttribute(attribute = "couchbase.bucket", description = "Bucket where the document was stored."),
@WritesAttribute(attribute = "couchbase.doc.id", description = "Id of the document."),
@WritesAttribute(attribute = "couchbase.doc.cas", description = "CAS of the document."),
@WritesAttribute(attribute = "couchbase.doc.expiry", description = "Expiration of the document."),
@WritesAttribute(attribute = "couchbase.exception", description = "If Couchbase related error occurs the CouchbaseException class name will be captured here.")
})
public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor
.Builder().name("Persist To")
.description("Durability constraint about disk persistence.")
.required(true)
.allowableValues(PersistTo.values())
.defaultValue(PersistTo.NONE.toString())
.build();
public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor.Builder().name("Persist To")
.description("Durability constraint about disk persistence.")
.required(true)
.allowableValues(PersistTo.values())
.defaultValue(PersistTo.NONE.toString())
.build();
public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor
.Builder().name("Replicate To")
.description("Durability constraint about replication.")
.required(true)
.allowableValues(ReplicateTo.values())
.defaultValue(ReplicateTo.NONE.toString())
.build();
public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor.Builder().name("Replicate To")
.description("Durability constraint about replication.")
.required(true)
.allowableValues(ReplicateTo.values())
.defaultValue(ReplicateTo.NONE.toString())
.build();
@Override
protected void addSupportedProperties(List<PropertyDescriptor> descriptors) {
@ -107,7 +104,7 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ProcessorLog logger = getLogger();
FlowFile flowFile = session.get();
if ( flowFile == null ) {
if (flowFile == null) {
return;
}
@ -119,41 +116,42 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
}
});
String docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key()));
if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
String docId = flowFile.getAttribute(CoreAttributes.UUID.key());
if (!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())) {
docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
}
try {
Document<?> doc = null;
DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
switch (documentType){
case Json : {
final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
switch (documentType) {
case Json: {
doc = RawJsonDocument.create(docId, new String(content, StandardCharsets.UTF_8));
break;
}
case Binary : {
ByteBuf buf = Unpooled.copiedBuffer(content);
case Binary: {
final ByteBuf buf = Unpooled.copiedBuffer(content);
doc = BinaryDocument.create(docId, buf);
break;
}
}
PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue());
ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue());
final PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue());
final ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue());
doc = openBucket(context).upsert(doc, persistTo, replicateTo);
Map<String, String> updatedAttrs = new HashMap<>();
final Map<String, String> updatedAttrs = new HashMap<>();
updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue());
updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas()));
updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry()));
flowFile = session.putAllAttributes(flowFile, updatedAttrs);
session.getProvenanceReporter().send(flowFile, getTransitUrl(context));
session.transfer(flowFile, REL_SUCCESS);
} catch (CouchbaseException e) {
String errMsg = String.format("Writing docuement %s to Couchbase Server using %s failed due to %s", docId, flowFile, e);
flowFile = session.putAllAttributes(flowFile, updatedAttrs);
session.getProvenanceReporter().send(flowFile, getTransitUrl(context, docId));
session.transfer(flowFile, REL_SUCCESS);
} catch (final CouchbaseException e) {
String errMsg = String.format("Writing document %s to Couchbase Server using %s failed due to %s", docId, flowFile, e);
handleCouchbaseException(context, session, logger, flowFile, e, errMsg);
}
}

View File

@ -100,10 +100,11 @@ public class TestGetCouchbaseKey {
testRunner.setProperty(BUCKET_NAME, bucketName);
testRunner.setProperty(DOC_ID, docId);
testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_ORIGINAL, 1);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
@ -145,7 +146,7 @@ public class TestGetCouchbaseKey {
}
@Test
public void testDocIdExpWithNullFlowFile() throws Exception {
public void testDocIdExpWithEmptyFlowFile() throws Exception {
String docIdExp = "doc-s";
String docId = "doc-s";
@ -157,10 +158,11 @@ public class TestGetCouchbaseKey {
testRunner.setProperty(DOC_ID, docIdExp);
testRunner.enqueue(new byte[0]);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_ORIGINAL, 0);
testRunner.assertTransferCount(REL_ORIGINAL, 1);
testRunner.assertTransferCount(REL_RETRY, 0);
testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
@ -179,11 +181,12 @@ public class TestGetCouchbaseKey {
setupMockBucket(bucket);
testRunner.setProperty(DOC_ID, docIdExp);
testRunner.enqueue(new byte[0]);
try {
testRunner.run();
fail("Exception should be thrown.");
} catch (AssertionError e){
} catch (AssertionError e) {
Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class));
}
@ -210,7 +213,7 @@ public class TestGetCouchbaseKey {
try {
testRunner.run();
fail("Exception should be thrown.");
} catch (AssertionError e){
} catch (AssertionError e) {
Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class));
}
@ -288,7 +291,7 @@ public class TestGetCouchbaseKey {
try {
testRunner.run();
fail("ProcessException should be thrown.");
} catch (AssertionError e){
} catch (AssertionError e) {
Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
}
@ -315,7 +318,7 @@ public class TestGetCouchbaseKey {
try {
testRunner.run();
fail("ProcessException should be thrown.");
} catch (AssertionError e){
} catch (AssertionError e) {
Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
Assert.assertTrue(e.getCause().getCause().getClass().equals(AuthenticationException.class));
}

View File

@ -32,8 +32,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -80,19 +78,19 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class);
private PriorityQueue<FlowFileRecord> activeQueue = null;
// guarded by lock
private ArrayList<FlowFileRecord> swapQueue = null;
private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L));
private boolean swapMode = false;
private volatile String maximumQueueDataSize;
private volatile long maximumQueueByteCount;
private volatile long maximumQueueObjectCount;
private final AtomicReference<MaxQueueSize> maxQueueSize = new AtomicReference<>(new MaxQueueSize("0 MB", 0L, 0L));
private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 mins", 0L));
private final EventReporter eventReporter;
private final AtomicLong flowFileExpirationMillis;
private final Connection connection;
private final AtomicReference<String> flowFileExpirationPeriod;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private final List<FlowFilePrioritizer> priorities;
private final int swapThreshold;
@ -106,8 +104,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private final ProvenanceEventRepository provRepository;
private final ResourceClaimManager resourceClaimManager;
private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
// SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK!
private final ProcessScheduler scheduler;
@ -115,11 +111,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) {
activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
priorities = new ArrayList<>();
maximumQueueObjectCount = 0L;
maximumQueueDataSize = "0 MB";
maximumQueueByteCount = 0L;
flowFileExpirationMillis = new AtomicLong(0);
flowFileExpirationPeriod = new AtomicReference<>("0 mins");
swapQueue = new ArrayList<>();
this.eventReporter = eventReporter;
this.swapManager = swapManager;
@ -161,36 +152,35 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
@Override
public void setBackPressureObjectThreshold(final long maxQueueSize) {
writeLock.lock();
try {
maximumQueueObjectCount = maxQueueSize;
this.queueFullRef.set(determineIfFull());
} finally {
writeLock.unlock("setBackPressureObjectThreshold");
public void setBackPressureObjectThreshold(final long threshold) {
boolean updated = false;
while (!updated) {
MaxQueueSize maxSize = maxQueueSize.get();
final MaxQueueSize updatedSize = new MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold);
updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
}
}
@Override
public long getBackPressureObjectThreshold() {
return maximumQueueObjectCount;
return maxQueueSize.get().getMaxCount();
}
@Override
public void setBackPressureDataSizeThreshold(final String maxDataSize) {
writeLock.lock();
try {
maximumQueueByteCount = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
maximumQueueDataSize = maxDataSize;
this.queueFullRef.set(determineIfFull());
} finally {
writeLock.unlock("setBackPressureDataSizeThreshold");
final long maxBytes = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
boolean updated = false;
while (!updated) {
MaxQueueSize maxSize = maxQueueSize.get();
final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize, maxBytes, maxSize.getMaxCount());
updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
}
}
@Override
public String getBackPressureDataSizeThreshold() {
return maximumQueueDataSize;
return maxQueueSize.get().getMaxSize();
}
@Override
@ -220,17 +210,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override
public void acknowledge(final FlowFileRecord flowFile) {
if (queueFullRef.get()) {
writeLock.lock();
try {
incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
queueFullRef.set(determineIfFull());
} finally {
writeLock.unlock("acknowledge(FlowFileRecord)");
}
} else {
incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
}
incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
// queue was full but no longer is. Notify that the source may now be available to run,
@ -246,17 +226,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
totalSize += flowFile.getSize();
}
if (queueFullRef.get()) {
writeLock.lock();
try {
incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
queueFullRef.set(determineIfFull());
} finally {
writeLock.unlock("acknowledge(FlowFileRecord)");
}
} else {
incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
}
incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
// it's possible that queue was full but no longer is. Notify that the source may now be available to run,
@ -267,33 +237,26 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override
public boolean isFull() {
return queueFullRef.get();
}
final MaxQueueSize maxSize = maxQueueSize.get();
/**
* MUST be called with either the read or write lock held
*
* @return true if full
*/
private boolean determineIfFull() {
final long maxSize = maximumQueueObjectCount;
final long maxBytes = maximumQueueByteCount;
if (maxSize <= 0 && maxBytes <= 0) {
// Check if max size is set
if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) {
return false;
}
final QueueSize queueSize = getQueueSize();
if (maxSize > 0 && queueSize.getObjectCount() >= maxSize) {
if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) {
return true;
}
if (maxBytes > 0 && queueSize.getByteCount() >= maxBytes) {
if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes()) {
return true;
}
return false;
}
@Override
public void put(final FlowFileRecord file) {
writeLock.lock();
@ -307,8 +270,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
incrementActiveQueueSize(1, file.getSize());
activeQueue.add(file);
}
queueFullRef.set(determineIfFull());
} finally {
writeLock.unlock("put(FlowFileRecord)");
}
@ -337,8 +298,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
incrementActiveQueueSize(numFiles, bytes);
activeQueue.addAll(files);
}
queueFullRef.set(determineIfFull());
} finally {
writeLock.unlock("putAll");
}
@ -374,7 +333,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
FlowFileRecord flowFile = null;
// First check if we have any records Pre-Fetched.
final long expirationMillis = flowFileExpirationMillis.get();
final long expirationMillis = expirationPeriod.get().getMillis();
writeLock.lock();
try {
flowFile = doPoll(expiredRecords, expirationMillis);
@ -393,10 +352,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
boolean isExpired;
migrateSwapToActive();
final boolean queueFullAtStart = queueFullRef.get();
long expiredBytes = 0L;
do {
flowFile = this.activeQueue.poll();
@ -424,13 +381,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes);
}
// if at least 1 FlowFile was expired & the queue was full before we started, then
// we need to determine whether or not the queue is full again. If no FlowFile was expired,
// then the queue will still be full until the appropriate #acknowledge method is called.
if (queueFullAtStart && !expiredRecords.isEmpty()) {
queueFullRef.set(determineIfFull());
}
return flowFile;
}
@ -451,8 +401,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords) {
migrateSwapToActive();
final boolean queueFullAtStart = queueFullRef.get();
final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords);
long expiredBytes = 0L;
@ -462,13 +410,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained);
incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes);
// if at least 1 FlowFile was expired & the queue was full before we started, then
// we need to determine whether or not the queue is full again. If no FlowFile was expired,
// then the queue will still be full until the appropriate #acknowledge method is called.
if (queueFullAtStart && !expiredRecords.isEmpty()) {
queueFullRef.set(determineIfFull());
}
}
/**
@ -660,7 +601,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
long drainedSize = 0L;
FlowFileRecord pulled = null;
final long expirationMillis = this.flowFileExpirationMillis.get();
final long expirationMillis = expirationPeriod.get().getMillis();
while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) {
if (isLaterThan(getExpirationDate(pulled, expirationMillis))) {
expiredRecords.add(pulled);
@ -688,8 +629,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
try {
migrateSwapToActive();
final long expirationMillis = this.flowFileExpirationMillis.get();
final boolean queueFullAtStart = queueFullRef.get();
final long expirationMillis = expirationPeriod.get().getMillis();
final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
final List<FlowFileRecord> unselected = new ArrayList<>();
@ -734,17 +674,10 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
this.activeQueue.addAll(unselected);
// if at least 1 FlowFile was expired & the queue was full before we started, then
// we need to determine whether or not the queue is full again. If no FlowFile was expired,
// then the queue will still be full until the appropriate #acknowledge method is called.
if (queueFullAtStart && !expiredRecords.isEmpty()) {
queueFullRef.set(determineIfFull());
}
incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
return selectedFlowFiles;
} finally {
incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
writeLock.unlock("poll(Filter, Set)");
}
}
@ -817,12 +750,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override
public String getFlowFileExpiration() {
return flowFileExpirationPeriod.get();
return expirationPeriod.get().getPeriod();
}
@Override
public int getFlowFileExpiration(final TimeUnit timeUnit) {
return (int) timeUnit.convert(flowFileExpirationMillis.get(), TimeUnit.MILLISECONDS);
return (int) timeUnit.convert(expirationPeriod.get().getMillis(), TimeUnit.MILLISECONDS);
}
@Override
@ -831,8 +764,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
if (millis < 0) {
throw new IllegalArgumentException("FlowFile Expiration Period must be positive");
}
this.flowFileExpirationPeriod.set(flowExpirationPeriod);
this.flowFileExpirationMillis.set(millis);
expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis));
}
@ -1287,4 +1220,57 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
" Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]";
}
}
private static class MaxQueueSize {
private final String maxSize;
private final long maxBytes;
private final long maxCount;
public MaxQueueSize(final String maxSize, final long maxBytes, final long maxCount) {
this.maxSize = maxSize;
this.maxBytes = maxBytes;
this.maxCount = maxCount;
}
public String getMaxSize() {
return maxSize;
}
public long getMaxBytes() {
return maxBytes;
}
public long getMaxCount() {
return maxCount;
}
@Override
public String toString() {
return maxCount + " Objects/" + maxSize;
}
}
private static class TimePeriod {
private final String period;
private final long millis;
public TimePeriod(final String period, final long millis) {
this.period = period;
this.millis = millis;
}
public String getPeriod() {
return period;
}
public long getMillis() {
return millis;
}
@Override
public String toString() {
return period;
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.nifi.controller;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -48,18 +49,28 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestStandardFlowFileQueue {
private TestSwapManager swapManager = null;
private StandardFlowFileQueue queue = null;
private List<ProvenanceEventRecord> provRecords = new ArrayList<>();
@Before
@SuppressWarnings("unchecked")
public void setup() {
provRecords.clear();
final Connection connection = Mockito.mock(Connection.class);
Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class));
Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
@ -72,6 +83,16 @@ public class TestStandardFlowFileQueue {
final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class);
Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder());
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
final Iterable<ProvenanceEventRecord> iterable = (Iterable<ProvenanceEventRecord>) invocation.getArguments()[0];
for (final ProvenanceEventRecord record : iterable) {
provRecords.add(record);
}
return null;
}
}).when(provRepo).registerEvents(Mockito.any(Iterable.class));
queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000);
TestFlowFile.idGenerator.set(0L);
@ -106,6 +127,160 @@ public class TestStandardFlowFileQueue {
assertEquals(0L, unackSize.getByteCount());
}
@Test
public void testBackPressure() {
queue.setBackPressureObjectThreshold(10);
assertTrue(queue.isEmpty());
assertTrue(queue.isActiveQueueEmpty());
assertFalse(queue.isFull());
for (int i = 0; i < 9; i++) {
queue.put(new TestFlowFile());
assertFalse(queue.isFull());
assertFalse(queue.isEmpty());
assertFalse(queue.isActiveQueueEmpty());
}
queue.put(new TestFlowFile());
assertTrue(queue.isFull());
assertFalse(queue.isEmpty());
assertFalse(queue.isActiveQueueEmpty());
final Set<FlowFileRecord> expiredRecords = new HashSet<>();
final FlowFileRecord polled = queue.poll(expiredRecords);
assertNotNull(polled);
assertTrue(expiredRecords.isEmpty());
assertFalse(queue.isEmpty());
assertFalse(queue.isActiveQueueEmpty());
// queue is still full because FlowFile has not yet been acknowledged.
assertTrue(queue.isFull());
queue.acknowledge(polled);
// FlowFile has been acknowledged; queue should no longer be full.
assertFalse(queue.isFull());
assertFalse(queue.isEmpty());
assertFalse(queue.isActiveQueueEmpty());
}
@Test
public void testBackPressureAfterPollFilter() throws InterruptedException {
queue.setBackPressureObjectThreshold(10);
queue.setFlowFileExpiration("10 millis");
for (int i = 0; i < 9; i++) {
queue.put(new TestFlowFile());
assertFalse(queue.isFull());
}
queue.put(new TestFlowFile());
assertTrue(queue.isFull());
Thread.sleep(100L);
final FlowFileFilter filter = new FlowFileFilter() {
@Override
public FlowFileFilterResult filter(final FlowFile flowFile) {
return FlowFileFilterResult.REJECT_AND_CONTINUE;
}
};
final Set<FlowFileRecord> expiredRecords = new HashSet<>();
final List<FlowFileRecord> polled = queue.poll(filter, expiredRecords);
assertTrue(polled.isEmpty());
assertEquals(10, expiredRecords.size());
assertFalse(queue.isFull());
assertTrue(queue.isEmpty());
assertTrue(queue.isActiveQueueEmpty());
}
@Test(timeout = 10000)
public void testBackPressureAfterDrop() throws InterruptedException {
queue.setBackPressureObjectThreshold(10);
queue.setFlowFileExpiration("10 millis");
for (int i = 0; i < 9; i++) {
queue.put(new TestFlowFile());
assertFalse(queue.isFull());
}
queue.put(new TestFlowFile());
assertTrue(queue.isFull());
Thread.sleep(100L);
final String requestId = UUID.randomUUID().toString();
final DropFlowFileStatus status = queue.dropFlowFiles(requestId, "Unit Test");
while (status.getState() != DropFlowFileState.COMPLETE) {
Thread.sleep(10L);
}
assertFalse(queue.isFull());
assertTrue(queue.isEmpty());
assertTrue(queue.isActiveQueueEmpty());
assertEquals(10, provRecords.size());
for (final ProvenanceEventRecord event : provRecords) {
assertNotNull(event);
assertEquals(ProvenanceEventType.DROP, event.getEventType());
}
}
@Test
public void testBackPressureAfterPollSingle() throws InterruptedException {
queue.setBackPressureObjectThreshold(10);
queue.setFlowFileExpiration("10 millis");
for (int i = 0; i < 9; i++) {
queue.put(new TestFlowFile());
assertFalse(queue.isFull());
}
queue.put(new TestFlowFile());
assertTrue(queue.isFull());
Thread.sleep(100L);
final Set<FlowFileRecord> expiredRecords = new HashSet<>();
final FlowFileRecord polled = queue.poll(expiredRecords);
assertNull(polled);
assertEquals(10, expiredRecords.size());
assertFalse(queue.isFull());
assertTrue(queue.isEmpty());
assertTrue(queue.isActiveQueueEmpty());
}
@Test
public void testBackPressureAfterPollMultiple() throws InterruptedException {
queue.setBackPressureObjectThreshold(10);
queue.setFlowFileExpiration("10 millis");
for (int i = 0; i < 9; i++) {
queue.put(new TestFlowFile());
assertFalse(queue.isFull());
}
queue.put(new TestFlowFile());
assertTrue(queue.isFull());
Thread.sleep(100L);
final Set<FlowFileRecord> expiredRecords = new HashSet<>();
final List<FlowFileRecord> polled = queue.poll(10, expiredRecords);
assertTrue(polled.isEmpty());
assertEquals(10, expiredRecords.size());
assertFalse(queue.isFull());
assertTrue(queue.isEmpty());
assertTrue(queue.isActiveQueueEmpty());
}
@Test
public void testSwapOutOccurs() {
for (int i = 0; i < 10000; i++) {

View File

@ -133,7 +133,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
.description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).addValidator(KERBEROS_CONFIG_VALIDATOR).build();
private static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder().name("Kerberos Relogin Period").required(false)
public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder().name("Kerberos Relogin Period").required(false)
.description("Period of time which should pass before attempting a kerberos relogin").defaultValue("4 hours")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

View File

@ -83,6 +83,9 @@ public class FetchHDFS extends AbstractHadoopProcessor {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HADOOP_CONFIGURATION_RESOURCES);
properties.add(FILENAME);
properties.add(KERBEROS_PRINCIPAL);
properties.add(KERBEROS_KEYTAB);
properties.add(KERBEROS_RELOGIN_PERIOD);
return properties;
}

View File

@ -138,6 +138,9 @@ public class ListHDFS extends AbstractHadoopProcessor {
properties.add(DISTRIBUTED_CACHE_SERVICE);
properties.add(DIRECTORY);
properties.add(RECURSE_SUBDIRS);
properties.add(KERBEROS_PRINCIPAL);
properties.add(KERBEROS_KEYTAB);
properties.add(KERBEROS_RELOGIN_PERIOD);
return properties;
}

View File

@ -34,6 +34,11 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.1</artifactId>

View File

@ -21,30 +21,47 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
@ -56,21 +73,20 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
import org.apache.nifi.util.LongHolder;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import scala.actors.threadpool.Arrays;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka")
public class PutKafka extends AbstractProcessor {
@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})
@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a "
+ "user-specified delimiter, such as a new-line.")
@TriggerWhenEmpty // because we have a queue of sessions that are ready to be committed
public class PutKafka extends AbstractSessionFactoryProcessor {
private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to"
public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", "FlowFile will be routed to"
+ " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration");
public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed"
+ " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than"
@ -79,16 +95,6 @@ public class PutKafka extends AbstractProcessor {
+ " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result"
+ " in data loss.");
/**
* AllowableValue for a Producer Type that synchronously sends messages to Kafka
*/
public static final AllowableValue PRODUCTER_TYPE_SYNCHRONOUS = new AllowableValue("sync", "Synchronous", "Send FlowFiles to Kafka immediately.");
/**
* AllowableValue for a Producer Type that asynchronously sends messages to Kafka
*/
public static final AllowableValue PRODUCTER_TYPE_ASYNCHRONOUS = new AllowableValue("async", "Asynchronous", "Batch messages before sending them to Kafka."
+ " While this will improve throughput, it opens the possibility that a failure on the client machine will drop unsent data.");
/**
* AllowableValue for sending messages to Kafka without compression
@ -105,6 +111,13 @@ public class PutKafka extends AbstractProcessor {
*/
public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", "Compress messages using Snappy");
static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue("Round Robin", "Round Robin",
"Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, the next Partition to Partition 2, and so on, wrapping as necessary.");
static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("Random Robin", "Random",
"Messages will be assigned to random partitions.");
static final AllowableValue USER_DEFINED_PARTITIONING = new AllowableValue("User-Defined", "User-Defined",
"The <Partition> property will be used to determine the partition. All messages within the same FlowFile will be assigned to the same partition.");
public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder()
.name("Known Brokers")
@ -120,6 +133,21 @@ public class PutKafka extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
static final PropertyDescriptor PARTITION_STRATEGY = new PropertyDescriptor.Builder()
.name("Partition Strategy")
.description("Specifies how messages should be partitioned when sent to Kafka")
.allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, USER_DEFINED_PARTITIONING)
.defaultValue(ROUND_ROBIN_PARTITIONING.getValue())
.required(true)
.build();
public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder()
.name("Partition")
.description("Specifies which Kafka Partition to add the message to. If using a message delimiter, all messages in the same FlowFile will be sent to the same partition. "
+ "If a partition is specified but is not valid, then all messages within the same FlowFile will use the same partition but it remains undefined which partition is used.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(true)
.required(false)
.build();
public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
.name("Kafka Key")
.description("The Key to use for the Message")
@ -140,7 +168,10 @@ public class PutKafka extends AbstractProcessor {
.description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
+ "If not specified, the entire content of the FlowFile will be used as a single message. "
+ "If specified, the contents of the FlowFile will be split on this delimiter and each section "
+ "sent as a separate Kafka message.")
+ "sent as a separate Kafka message. Note that if messages are delimited and some messages for a given FlowFile "
+ "are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those "
+ "messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' "
+ "relationship.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
@ -151,6 +182,13 @@ public class PutKafka extends AbstractProcessor {
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("5 MB")
.build();
static final PropertyDescriptor MAX_RECORD_SIZE = new PropertyDescriptor.Builder()
.name("Max Record Size")
.description("The maximum size that any individual record can be.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.required(true)
.defaultValue("1 MB")
.build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
@ -168,20 +206,10 @@ public class PutKafka extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder()
.name("Producer Type")
.description("This parameter specifies whether the messages are sent asynchronously in a background thread.")
.required(true)
.allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, PRODUCTER_TYPE_ASYNCHRONOUS)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue())
.build();
public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder()
.name("Async Batch Size")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " The number of messages to send in one batch when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode."
+ " The producer will wait until either this number of messages are ready"
.displayName("Batch Size")
.description("The number of messages to send in one batch. The producer will wait until either this number of messages are ready"
+ " to send or \"Queue Buffering Max Time\" is reached.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
@ -189,35 +217,13 @@ public class PutKafka extends AbstractProcessor {
.build();
public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder()
.name("Queue Buffering Max Time")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " Maximum time to buffer data when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 100 ms"
+ " will try to batch together 100ms of messages to send at once. This will improve"
.description("Maximum time to buffer data before sending to Kafka. For example a setting of 100 ms"
+ " will try to batch together 100 milliseconds' worth of messages to send at once. This will improve"
+ " throughput but adds message delivery latency due to the buffering.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("5 secs")
.build();
public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder()
.name("Queue Buffer Max Count")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " The maximum number of unsent messages that can be queued up in the producer when"
+ " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode before either the producer must be blocked or data must be dropped.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("10000")
.build();
public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new PropertyDescriptor.Builder()
.name("Queue Enqueue Timeout")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " The amount of time to block before dropping messages when running in "
+ PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode"
+ " and the buffer has reached the \"Queue Buffer Max Count\". If set to 0, events will"
+ " be enqueued immediately or dropped if the queue is full (the producer send call will"
+ " never block). If not set, the producer will block indefinitely and never willingly"
+ " drop a send.")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
.name("Compression Codec")
.description("This parameter allows you to specify the compression codec for all"
@ -227,16 +233,6 @@ public class PutKafka extends AbstractProcessor {
.allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY)
.defaultValue(COMPRESSION_CODEC_NONE.getValue())
.build();
public static final PropertyDescriptor COMPRESSED_TOPICS = new PropertyDescriptor.Builder()
.name("Compressed Topics")
.description("This parameter allows you to set whether compression should be turned on"
+ " for particular topics. If the compression codec is anything other than"
+ " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", enable compression only for specified topics if any."
+ " If the list of compressed topics is empty, then enable the specified"
+ " compression codec for all topics. If the compression codec is " + COMPRESSION_CODEC_NONE.getDisplayName() + ","
+ " compression is disabled for all topics")
.required(false)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -247,7 +243,13 @@ public class PutKafka extends AbstractProcessor {
.description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
.build();
private final BlockingQueue<Producer<byte[], byte[]>> producers = new LinkedBlockingQueue<>();
private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
private final BlockingQueue<FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue<>();
private final Set<FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet<FlowFileMessageBatch>());
private final ConcurrentMap<String, AtomicLong> partitionIndexMap = new ConcurrentHashMap<>();
private volatile Producer<byte[], byte[]> producer;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -259,36 +261,21 @@ public class PutKafka extends AbstractProcessor {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(SEED_BROKERS);
props.add(TOPIC);
props.add(PARTITION_STRATEGY);
props.add(PARTITION);
props.add(KEY);
props.add(DELIVERY_GUARANTEE);
props.add(MESSAGE_DELIMITER);
props.add(MAX_BUFFER_SIZE);
props.add(MAX_RECORD_SIZE);
props.add(TIMEOUT);
props.add(PRODUCER_TYPE);
props.add(BATCH_NUM_MESSAGES);
props.add(QUEUE_BUFFERING_MAX_MESSAGES);
props.add(QUEUE_BUFFERING_MAX);
props.add(QUEUE_ENQUEUE_TIMEOUT);
props.add(COMPRESSION_CODEC);
props.add(COMPRESSED_TOPICS);
props.add(clientName);
return props;
}
@Override
public Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context));
final Integer batchMessages = context.getProperty(BATCH_NUM_MESSAGES).asInteger();
final Integer bufferMaxMessages = context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).asInteger();
if (batchMessages > bufferMaxMessages) {
errors.add(new ValidationResult.Builder().subject("Batch Size, Queue Buffer").valid(false)
.explanation("Batch Size (" + batchMessages + ") must be equal to or less than the Queue Buffer Max Count (" + bufferMaxMessages + ")").build());
}
return errors;
}
@Override
public Set<Relationship> getRelationships() {
@ -298,71 +285,131 @@ public class PutKafka extends AbstractProcessor {
return relationships;
}
@OnStopped
public void closeProducers() {
Producer<byte[], byte[]> producer;
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
while ((producer = producers.poll()) != null) {
final String partitionStrategy = validationContext.getProperty(PARTITION_STRATEGY).getValue();
if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue()) && !validationContext.getProperty(PARTITION).isSet()) {
results.add(new ValidationResult.Builder().subject("Partition").valid(false).explanation(
"The <Partition> property must be set when configured to use the User-Defined Partitioning Strategy").build());
}
return results;
}
protected Producer<byte[], byte[]> getProducer() {
return producer;
}
@OnStopped
public void cleanup() {
final Producer<byte[], byte[]> producer = getProducer();
if (producer != null) {
producer.close();
}
for (final FlowFileMessageBatch batch : activeBatches) {
batch.cancelOrComplete();
}
}
protected ProducerConfig createConfig(final ProcessContext context) {
@OnScheduled
public void createProducer(final ProcessContext context) {
producer = new KafkaProducer<byte[], byte[]>(createConfig(context), new ByteArraySerializer(), new ByteArraySerializer());
}
protected int getActiveMessageBatchCount() {
return activeBatches.size();
}
protected int getCompleteMessageBatchCount() {
return completeBatches.size();
}
protected Properties createConfig(final ProcessContext context) {
final String brokers = context.getProperty(SEED_BROKERS).getValue();
final Properties properties = new Properties();
properties.setProperty("metadata.broker.list", brokers);
properties.setProperty("request.required.acks", context.getProperty(DELIVERY_GUARANTEE).getValue());
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("acks", context.getProperty(DELIVERY_GUARANTEE).getValue());
properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
properties.setProperty("request.timeout.ms", String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()));
properties.setProperty("message.send.max.retries", "1");
properties.setProperty("producer.type", context.getProperty(PRODUCER_TYPE).getValue());
properties.setProperty("batch.num.messages", context.getProperty(BATCH_NUM_MESSAGES).getValue());
final String timeout = String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
properties.setProperty("timeout.ms", timeout);
properties.setProperty("metadata.fetch.timeout.ms", timeout);
properties.setProperty("batch.size", context.getProperty(BATCH_NUM_MESSAGES).getValue());
properties.setProperty("max.request.size", String.valueOf(context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).longValue()));
final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue();
properties.setProperty("buffer.memory", String.valueOf(maxBufferSize));
final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue();
properties.setProperty("compression.type", compressionCodec);
final Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS);
if (queueBufferingMillis != null) {
properties.setProperty("queue.buffering.max.ms", String.valueOf(queueBufferingMillis));
}
properties.setProperty("queue.buffering.max.messages", context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue());
final Long queueEnqueueTimeoutMillis = context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
if (queueEnqueueTimeoutMillis != null) {
properties.setProperty("queue.enqueue.timeout.ms", String.valueOf(queueEnqueueTimeoutMillis));
properties.setProperty("linger.ms", String.valueOf(queueBufferingMillis));
}
final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue();
properties.setProperty("compression.codec", compressionCodec);
properties.setProperty("retries", "0");
properties.setProperty("block.on.buffer.full", "false");
final String compressedTopics = context.getProperty(COMPRESSED_TOPICS).getValue();
if (compressedTopics != null) {
properties.setProperty("compressed.topics", compressedTopics);
return properties;
}
private Integer getPartition(final ProcessContext context, final FlowFile flowFile, final String topic) {
final long unnormalizedIndex;
final String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue();
if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) {
AtomicLong partitionIndex = partitionIndexMap.get(topic);
if (partitionIndex == null) {
partitionIndex = new AtomicLong(0L);
final AtomicLong existing = partitionIndexMap.putIfAbsent(topic, partitionIndex);
if (existing != null) {
partitionIndex = existing;
}
}
unnormalizedIndex = partitionIndex.getAndIncrement();
} else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) {
return null;
} else {
if (context.getProperty(PARTITION).isSet()) {
final String partitionValue = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
if (NUMBER_PATTERN.matcher(partitionValue).matches()) {
// Subtract 1 because if the partition is "3" then we want to get index 2 into the List of partitions.
unnormalizedIndex = Long.parseLong(partitionValue) - 1;
} else {
unnormalizedIndex = partitionValue.hashCode();
}
} else {
return null;
}
}
return new ProducerConfig(properties);
}
protected Producer<byte[], byte[]> createProducer(final ProcessContext context) {
return new Producer<>(createConfig(context));
}
private Producer<byte[], byte[]> borrowProducer(final ProcessContext context) {
final Producer<byte[], byte[]> producer = producers.poll();
return producer == null ? createProducer(context) : producer;
}
private void returnProducer(final Producer<byte[], byte[]> producer) {
producers.offer(producer);
final Producer<byte[], byte[]> producer = getProducer();
final List<PartitionInfo> partitionInfos = producer.partitionsFor(topic);
final int partitionIdx = (int) (unnormalizedIndex % partitionInfos.size());
return partitionInfos.get(partitionIdx).partition();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
FlowFileMessageBatch batch;
while ((batch = completeBatches.poll()) != null) {
batch.completeSession();
}
final ProcessSession session = sessionFactory.createSession();
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final long start = System.nanoTime();
final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8);
@ -371,8 +418,7 @@ public class PutKafka extends AbstractProcessor {
delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
}
final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue();
final Producer<byte[], byte[]> producer = borrowProducer(context);
final Producer<byte[], byte[]> producer = getProducer();
if (delimiter == null) {
// Send the entire FlowFile as a single message.
@ -384,31 +430,38 @@ public class PutKafka extends AbstractProcessor {
}
});
boolean error = false;
final Integer partition;
try {
final KeyedMessage<byte[], byte[]> message;
if (key == null) {
message = new KeyedMessage<>(topic, value);
} else {
message = new KeyedMessage<>(topic, keyBytes, value);
}
producer.send(message);
final long nanos = System.nanoTime() - start;
session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] { flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) });
partition = getPartition(context, flowFile, topic);
} catch (final Exception e) {
getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] { flowFile, e });
getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e});
session.transfer(session.penalize(flowFile), REL_FAILURE);
error = true;
} finally {
if (error) {
producer.close();
} else {
returnProducer(producer);
}
session.commit();
return;
}
final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topic, partition, keyBytes, value);
final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic);
messageBatch.setNumMessages(1);
activeBatches.add(messageBatch);
try {
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception == null) {
// record was successfully sent.
messageBatch.addSuccessfulRange(0L, flowFile.getSize(), metadata.offset());
} else {
messageBatch.addFailedRange(0L, flowFile.getSize(), exception);
}
}
});
} catch (final BufferExhaustedException bee) {
messageBatch.addFailedRange(0L, flowFile.getSize(), bee);
context.yield();
return;
}
} else {
final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8);
@ -418,9 +471,9 @@ public class PutKafka extends AbstractProcessor {
// the stream of bytes in the FlowFile
final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes);
boolean error = false;
final LongHolder lastMessageOffset = new LongHolder(0L);
final LongHolder messagesSent = new LongHolder(0L);
final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic);
activeBatches.add(messageBatch);
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
session.read(flowFile, new InputStreamCallback() {
@ -430,13 +483,12 @@ public class PutKafka extends AbstractProcessor {
boolean streamFinished = false;
final List<KeyedMessage<byte[], byte[]>> messages = new ArrayList<>(); // batch to send
long messageBytes = 0L; // size of messages in the 'messages' list
int nextByte;
try (final InputStream bufferedIn = new BufferedInputStream(rawIn);
final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
long messageStartOffset = in.getBytesConsumed();
// read until we're out of data.
while (!streamFinished) {
nextByte = in.read();
@ -457,107 +509,309 @@ public class PutKafka extends AbstractProcessor {
}
if (data != null) {
final long messageEndOffset = in.getBytesConsumed();
// If the message has no data, ignore it.
if (data.length != 0) {
// either we ran out of data or we reached the end of the message.
// Either way, create the message because it's ready to send.
final KeyedMessage<byte[], byte[]> message;
if (key == null) {
message = new KeyedMessage<>(topic, data);
} else {
message = new KeyedMessage<>(topic, keyBytes, data);
final Integer partition;
try {
partition = getPartition(context, flowFile, topic);
} catch (final Exception e) {
messageBatch.addFailedRange(messageStartOffset, messageEndOffset, e);
getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e});
continue;
}
// Add the message to the list of messages ready to send. If we've reached our
// threshold of how many we're willing to send (or if we're out of data), go ahead
// and send the whole List.
messages.add(message);
messageBytes += data.length;
if (messageBytes >= maxBufferSize || streamFinished) {
// send the messages, then reset our state.
try {
producer.send(messages);
} catch (final Exception e) {
// we wrap the general exception in ProcessException because we want to separate
// failures in sending messages from general Exceptions that would indicate bugs
// in the Processor. Failure to send a message should be handled appropriately, but
// we don't want to catch the general Exception or RuntimeException in order to catch
// failures from Kafka's Producer.
throw new ProcessException("Failed to send messages to Kafka", e);
}
messagesSent.addAndGet(messages.size()); // count number of messages sent
final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topic, partition, keyBytes, data);
final long rangeStart = messageStartOffset;
// reset state
messages.clear();
messageBytes = 0;
try {
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception == null) {
// record was successfully sent.
messageBatch.addSuccessfulRange(rangeStart, messageEndOffset, metadata.offset());
} else {
messageBatch.addFailedRange(rangeStart, messageEndOffset, exception);
}
}
});
// We've successfully sent a batch of messages. Keep track of the byte offset in the
// FlowFile of the last successfully sent message. This way, if the messages cannot
// all be successfully sent, we know where to split off the data. This allows us to then
// split off the first X number of bytes and send to 'success' and then split off the rest
// and send them to 'failure'.
lastMessageOffset.set(in.getBytesConsumed());
messagesSent.incrementAndGet();
} catch (final BufferExhaustedException bee) {
// Not enough room in the buffer. Add from the beginning of this message to end of FlowFile as a failed range
messageBatch.addFailedRange(messageStartOffset, flowFile.getSize(), bee);
context.yield();
return;
}
}
// reset BAOS so that we can start a new message.
baos.reset();
data = null;
}
}
// If there are messages left, send them
if (!messages.isEmpty()) {
try {
messagesSent.addAndGet(messages.size()); // add count of messages
producer.send(messages);
} catch (final Exception e) {
throw new ProcessException("Failed to send messages to Kafka", e);
messageStartOffset = in.getBytesConsumed();
}
}
}
}
});
final long nanos = System.nanoTime() - start;
session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages");
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] { messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) });
} catch (final ProcessException pe) {
error = true;
// There was a failure sending messages to Kafka. Iff the lastMessageOffset is 0, then all of them failed and we can
// just route the FlowFile to failure. Otherwise, some messages were successful, so split them off and send them to
// 'success' while we send the others to 'failure'.
final long offset = lastMessageOffset.get();
if (offset == 0L) {
// all of the messages failed to send. Route FlowFile to failure
getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] { flowFile, pe.getCause() });
session.transfer(session.penalize(flowFile), REL_FAILURE);
} else {
// Some of the messages were sent successfully. We want to split off the successful messages from the failed messages.
final FlowFile successfulMessages = session.clone(flowFile, 0L, offset);
final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset);
getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into"
+ " two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[] {
messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() });
session.transfer(successfulMessages, REL_SUCCESS);
session.transfer(session.penalize(failedMessages), REL_FAILURE);
session.remove(flowFile);
session.getProvenanceReporter().send(successfulMessages, "kafka://" + topic);
}
} finally {
if (error) {
producer.close();
} else {
returnProducer(producer);
}
messageBatch.setNumMessages(messagesSent.get());
}
}
}
private static class Range {
private final long start;
private final long end;
private final Long kafkaOffset;
public Range(final long start, final long end, final Long kafkaOffset) {
this.start = start;
this.end = end;
this.kafkaOffset = kafkaOffset;
}
public long getStart() {
return start;
}
public long getEnd() {
return end;
}
public Long getKafkaOffset() {
return kafkaOffset;
}
@Override
public String toString() {
return "Range[" + start + "-" + end + "]";
}
}
private class FlowFileMessageBatch {
private final ProcessSession session;
private final FlowFile flowFile;
private final String topic;
private final long startTime = System.nanoTime();
private final List<Range> successfulRanges = new ArrayList<>();
private final List<Range> failedRanges = new ArrayList<>();
private Exception lastFailureReason;
private long numMessages = -1L;
private long completeTime = 0L;
private boolean canceled = false;
public FlowFileMessageBatch(final ProcessSession session, final FlowFile flowFile, final String topic) {
this.session = session;
this.flowFile = flowFile;
this.topic = topic;
}
public synchronized void cancelOrComplete() {
if (isComplete()) {
completeSession();
return;
}
this.canceled = true;
session.rollback();
successfulRanges.clear();
failedRanges.clear();
}
public synchronized void addSuccessfulRange(final long start, final long end, final long kafkaOffset) {
if (canceled) {
return;
}
successfulRanges.add(new Range(start, end, kafkaOffset));
if (isComplete()) {
activeBatches.remove(this);
completeBatches.add(this);
completeTime = System.nanoTime();
}
}
public synchronized void addFailedRange(final long start, final long end, final Exception e) {
if (canceled) {
return;
}
failedRanges.add(new Range(start, end, null));
lastFailureReason = e;
if (isComplete()) {
activeBatches.remove(this);
completeBatches.add(this);
completeTime = System.nanoTime();
}
}
private boolean isComplete() {
return !canceled && (numMessages > -1) && (successfulRanges.size() + failedRanges.size() >= numMessages);
}
public synchronized void setNumMessages(final long msgCount) {
this.numMessages = msgCount;
if (isComplete()) {
activeBatches.remove(this);
completeBatches.add(this);
completeTime = System.nanoTime();
}
}
private Long getMin(final Long a, final Long b) {
if (a == null && b == null) {
return null;
}
if (a == null) {
return b;
}
if (b == null) {
return a;
}
return Math.min(a, b);
}
private Long getMax(final Long a, final Long b) {
if (a == null && b == null) {
return null;
}
if (a == null) {
return b;
}
if (b == null) {
return a;
}
return Math.max(a, b);
}
private void transferRanges(final List<Range> ranges, final Relationship relationship) {
Collections.sort(ranges, new Comparator<Range>() {
@Override
public int compare(final Range o1, final Range o2) {
return Long.compare(o1.getStart(), o2.getStart());
}
});
for (int i = 0; i < ranges.size(); i++) {
Range range = ranges.get(i);
int count = 1;
Long smallestKafkaOffset = range.getKafkaOffset();
Long largestKafkaOffset = range.getKafkaOffset();
while (i + 1 < ranges.size()) {
// Check if the next range in the List continues where this one left off.
final Range nextRange = ranges.get(i + 1);
if (nextRange.getStart() == range.getEnd()) {
// We have two ranges in a row that are contiguous; combine them into a single Range.
range = new Range(range.getStart(), nextRange.getEnd(), null);
smallestKafkaOffset = getMin(smallestKafkaOffset, nextRange.getKafkaOffset());
largestKafkaOffset = getMax(largestKafkaOffset, nextRange.getKafkaOffset());
count++;
i++;
} else {
break;
}
}
// Create a FlowFile for this range.
FlowFile child = session.clone(flowFile, range.getStart(), range.getEnd() - range.getStart());
if (relationship == REL_SUCCESS) {
session.getProvenanceReporter().send(child, getTransitUri(), "Sent " + count + " messages; Kafka offsets range from " + smallestKafkaOffset + " to " + largestKafkaOffset);
session.transfer(child, relationship);
} else {
session.transfer(session.penalize(child), relationship);
}
}
}
private String getTransitUri() {
final List<PartitionInfo> partitions = getProducer().partitionsFor(topic);
if (partitions.isEmpty()) {
return "kafka://unknown-host" + "/topics/" + topic;
}
final PartitionInfo info = partitions.get(0);
final Node leader = info.leader();
final String host = leader.host();
final int port = leader.port();
return "kafka://" + host + ":" + port + "/topics/" + topic;
}
public synchronized void completeSession() {
if (canceled) {
return;
}
if (successfulRanges.isEmpty() && failedRanges.isEmpty()) {
getLogger().info("Completed processing {} but sent 0 FlowFiles to Kafka", new Object[] {flowFile});
session.transfer(flowFile, REL_SUCCESS);
session.commit();
return;
}
if (successfulRanges.isEmpty()) {
getLogger().error("Failed to send {} to Kafka; routing to 'failure'; last failure reason reported was {};", new Object[] {flowFile, lastFailureReason});
session.transfer(session.penalize(flowFile), REL_FAILURE);
session.commit();
return;
}
if (failedRanges.isEmpty()) {
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(completeTime - startTime);
if (successfulRanges.size() == 1) {
final Long kafkaOffset = successfulRanges.get(0).getKafkaOffset();
final String msg = "Sent 1 message" + ((kafkaOffset == null) ? "" : ("; Kafka offset = " + kafkaOffset));
session.getProvenanceReporter().send(flowFile, getTransitUri(), msg);
} else {
long smallestKafkaOffset = successfulRanges.get(0).getKafkaOffset();
long largestKafkaOffset = successfulRanges.get(0).getKafkaOffset();
for (final Range range : successfulRanges) {
smallestKafkaOffset = Math.min(smallestKafkaOffset, range.getKafkaOffset());
largestKafkaOffset = Math.max(largestKafkaOffset, range.getKafkaOffset());
}
session.getProvenanceReporter().send(flowFile, getTransitUri(),
"Sent " + successfulRanges.size() + " messages; Kafka offsets range from " + smallestKafkaOffset + " to " + largestKafkaOffset);
}
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] {successfulRanges.size(), flowFile, transferMillis});
session.commit();
return;
}
// At this point, the successful ranges is not empty and the failed ranges is not empty. This indicates that some messages made their way to Kafka
// successfully and some failed. We will address this by splitting apart the source FlowFile into children and sending the successful messages to 'success'
// and the failed messages to 'failure'.
transferRanges(successfulRanges, REL_SUCCESS);
transferRanges(failedRanges, REL_FAILURE);
session.remove(flowFile);
getLogger().error("Successfully sent {} messages to Kafka but failed to send {} messages; the last error received was {}",
new Object[] {successfulRanges.size(), failedRanges.size(), lastFailureReason});
session.commit();
}
}
}

View File

@ -22,27 +22,33 @@ import static org.junit.Assert.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import kafka.common.FailedToSendMessageException;
import kafka.javaapi.producer.Producer;
import kafka.message.CompressionCodec;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.ProcessContext;
import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Ignore;
import org.junit.Assert;
import org.junit.Test;
import scala.collection.Seq;
import kafka.common.FailedToSendMessageException;
public class TestPutKafka {
@ -56,24 +62,19 @@ public class TestPutKafka {
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes());
runner.run();
runner.run(2); // we have to run twice because the first iteration will result in data being added to a queue in the processor; the second onTrigger call will transfer FlowFiles.
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
final List<byte[]> messages = proc.getProducer().getMessages();
final List<ProducerRecord<byte[], byte[]>> messages = ((MockProducer) proc.getProducer()).getMessages();
assertEquals(11, messages.size());
assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0)));
assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1)));
assertTrue(Arrays.equals("1".getBytes(StandardCharsets.UTF_8), messages.get(2)));
assertTrue(Arrays.equals("2".getBytes(StandardCharsets.UTF_8), messages.get(3)));
assertTrue(Arrays.equals("3".getBytes(StandardCharsets.UTF_8), messages.get(4)));
assertTrue(Arrays.equals("4".getBytes(StandardCharsets.UTF_8), messages.get(5)));
assertTrue(Arrays.equals("5".getBytes(StandardCharsets.UTF_8), messages.get(6)));
assertTrue(Arrays.equals("6".getBytes(StandardCharsets.UTF_8), messages.get(7)));
assertTrue(Arrays.equals("7".getBytes(StandardCharsets.UTF_8), messages.get(8)));
assertTrue(Arrays.equals("8".getBytes(StandardCharsets.UTF_8), messages.get(9)));
assertTrue(Arrays.equals("9".getBytes(StandardCharsets.UTF_8), messages.get(10)));
assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0).value()));
assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1).value()));
for (int i = 1; i <= 9; i++) {
assertTrue(Arrays.equals(String.valueOf(i).getBytes(StandardCharsets.UTF_8), messages.get(i + 1).value()));
}
}
@Test
@ -87,7 +88,7 @@ public class TestPutKafka {
final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9";
runner.enqueue(text.getBytes());
runner.run();
runner.run(2);
runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
@ -96,7 +97,7 @@ public class TestPutKafka {
@Test
public void testPartialFailure() {
final TestableProcessor proc = new TestableProcessor(2);
final TestableProcessor proc = new TestableProcessor(2); // fail after sending 2 messages.
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
@ -106,7 +107,7 @@ public class TestPutKafka {
final byte[] bytes = "1\n2\n3\n4".getBytes();
runner.enqueue(bytes);
runner.run();
runner.run(2);
runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
@ -118,6 +119,39 @@ public class TestPutKafka {
failureFF.assertContentEquals("3\n4");
}
@Test
public void testPartialFailureWithSuccessBeforeAndAfter() {
final TestableProcessor proc = new TestableProcessor(2, 4); // fail after sending 2 messages, then stop failing after 4
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B");
final byte[] bytes = "1\n2\n3\n4\n5\n6".getBytes();
runner.enqueue(bytes);
runner.run(2);
runner.assertTransferCount(PutKafka.REL_SUCCESS, 2);
runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
final List<MockFlowFile> success = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS);
for (final MockFlowFile successFF : success) {
if ('1' == successFF.toByteArray()[0]) {
successFF.assertContentEquals("1\n2\n");
} else if ('5' == successFF.toByteArray()[0]) {
successFF.assertContentEquals("5\n6");
} else {
Assert.fail("Wrong content for FlowFile; contained " + new String(successFF.toByteArray()));
}
}
final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
failureFF.assertContentEquals("3\n4\n");
}
@Test
public void testWithEmptyMessages() {
final TestableProcessor proc = new TestableProcessor();
@ -129,16 +163,16 @@ public class TestPutKafka {
final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
runner.enqueue(bytes);
runner.run();
runner.run(2);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
final List<byte[]> msgs = proc.getProducer().getMessages();
final List<ProducerRecord<byte[], byte[]>> msgs = ((MockProducer) proc.getProducer()).getMessages();
assertEquals(4, msgs.size());
assertTrue(Arrays.equals("1".getBytes(), msgs.get(0)));
assertTrue(Arrays.equals("2".getBytes(), msgs.get(1)));
assertTrue(Arrays.equals("3".getBytes(), msgs.get(2)));
assertTrue(Arrays.equals("4".getBytes(), msgs.get(3)));
for (int i = 1; i <= 4; i++) {
assertTrue(Arrays.equals(String.valueOf(i).getBytes(), msgs.get(i - 1).value()));
}
}
@Test
@ -154,14 +188,14 @@ public class TestPutKafka {
final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
runner.enqueue(bytes);
runner.run();
runner.run(2);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.SEND, event.getEventType());
assertEquals("kafka://topic1", event.getTransitUri());
assertEquals("Sent 4 messages", event.getDetails());
assertEquals("kafka://localhost:1111/topics/topic1", event.getTransitUri());
assertTrue(event.getDetails().startsWith("Sent 4 messages"));
}
@Test
@ -175,265 +209,270 @@ public class TestPutKafka {
final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
runner.enqueue(bytes);
runner.run();
runner.run(2);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.SEND, event.getEventType());
assertEquals("kafka://topic1", event.getTransitUri());
assertEquals("kafka://localhost:1111/topics/topic1", event.getTransitUri());
}
@Test
@Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...")
public void testKeyValuePut() {
final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
runner.setProperty(PutKafka.KEY, "${kafka.key}");
runner.setProperty(PutKafka.TIMEOUT, "3 secs");
runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue());
public void testRoundRobinAcrossMultipleMessages() {
final TestableProcessor proc = new TestableProcessor();
keyValuePutExecute(runner);
}
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.ROUND_ROBIN_PARTITIONING);
@Test
@Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...")
public void testKeyValuePutAsync() {
final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
runner.setProperty(PutKafka.KEY, "${kafka.key}");
runner.setProperty(PutKafka.TIMEOUT, "3 secs");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue());
keyValuePutExecute(runner);
}
private void keyValuePutExecute(final TestRunner runner) {
final Map<String, String> attributes = new HashMap<>();
attributes.put("kafka.topic", "test");
attributes.put("kafka.key", "key3");
final byte[] data = "Hello, World, Again! ;)".getBytes();
runner.enqueue(data, attributes);
runner.enqueue(data, attributes);
runner.enqueue(data, attributes);
runner.enqueue(data, attributes);
runner.enqueue("hello".getBytes());
runner.enqueue("there".getBytes());
runner.enqueue("how are you".getBytes());
runner.enqueue("today".getBytes());
runner.run(5);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4);
final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS);
final MockFlowFile mff = mffs.get(0);
assertTrue(Arrays.equals(data, mff.toByteArray()));
final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages();
for (int i = 0; i < 3; i++) {
assertEquals(i + 1, records.get(i).partition().intValue());
}
assertEquals(1, records.get(3).partition().intValue());
}
@Test
public void testProducerConfigDefault() {
final TestableProcessor processor = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(processor);
public void testRoundRobinAcrossMultipleMessagesInSameFlowFile() {
final TestableProcessor proc = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.ROUND_ROBIN_PARTITIONING);
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
final ProcessContext context = runner.getProcessContext();
final ProducerConfig config = processor.createConfig(context);
runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes());
// Check the codec
final CompressionCodec codec = config.compressionCodec();
assertTrue(codec instanceof kafka.message.NoCompressionCodec$);
runner.run(2);
// Check compressed topics
final Seq<String> compressedTopics = config.compressedTopics();
assertEquals(0, compressedTopics.size());
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
// Check the producer type
final String actualProducerType = config.producerType();
assertEquals(PutKafka.PRODUCER_TYPE.getDefaultValue(), actualProducerType);
final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages();
for (int i = 0; i < 3; i++) {
assertEquals(i + 1, records.get(i).partition().intValue());
}
assertEquals(1, records.get(3).partition().intValue());
}
@Test
public void testProducerConfigAsyncWithCompression() {
final TestableProcessor processor = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(processor);
public void testUserDefinedPartition() {
final TestableProcessor proc = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
runner.setProperty(PutKafka.COMPRESSION_CODEC, PutKafka.COMPRESSION_CODEC_SNAPPY.getValue());
runner.setProperty(PutKafka.COMPRESSED_TOPICS, "topic01,topic02,topic03");
runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING);
runner.setProperty(PutKafka.PARTITION, "${part}");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
final ProcessContext context = runner.getProcessContext();
final ProducerConfig config = processor.createConfig(context);
final Map<String, String> attrs = new HashMap<>();
attrs.put("part", "3");
runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs);
// Check that the codec is snappy
final CompressionCodec codec = config.compressionCodec();
assertTrue(codec instanceof kafka.message.SnappyCompressionCodec$);
runner.run(2);
// Check compressed topics
final Seq<String> compressedTopics = config.compressedTopics();
assertEquals(3, compressedTopics.size());
assertTrue(compressedTopics.contains("topic01"));
assertTrue(compressedTopics.contains("topic02"));
assertTrue(compressedTopics.contains("topic03"));
// Check the producer type
final String actualProducerType = config.producerType();
assertEquals("async", actualProducerType);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages();
for (int i = 0; i < 4; i++) {
assertEquals(3, records.get(i).partition().intValue());
}
}
@Test
public void testProducerConfigAsyncQueueThresholds() {
final TestableProcessor processor = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(processor);
public void testUserDefinedPartitionWithInvalidValue() {
final TestableProcessor proc = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX, "7 secs");
runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "535");
runner.setProperty(PutKafka.QUEUE_ENQUEUE_TIMEOUT, "200 ms");
runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING);
runner.setProperty(PutKafka.PARTITION, "${part}");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
final ProcessContext context = runner.getProcessContext();
final ProducerConfig config = processor.createConfig(context);
final Map<String, String> attrs = new HashMap<>();
attrs.put("part", "bogus");
runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs);
// Check that the queue thresholds were properly translated
assertEquals(7000, config.queueBufferingMaxMs());
assertEquals(535, config.queueBufferingMaxMessages());
assertEquals(200, config.queueEnqueueTimeoutMs());
runner.run(2);
// Check the producer type
final String actualProducerType = config.producerType();
assertEquals("async", actualProducerType);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages();
// should all be the same partition, regardless of what partition it is.
final int partition = records.get(0).partition().intValue();
for (int i = 0; i < 4; i++) {
assertEquals(partition, records.get(i).partition().intValue());
}
}
@Test
public void testProducerConfigInvalidBatchSize() {
final TestableProcessor processor = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(processor);
public void testFullBuffer() {
final TestableProcessor proc = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
runner.setProperty(PutKafka.BATCH_NUM_MESSAGES, "200");
runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "100");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "5 B");
proc.setMaxQueueSize(10L); // will take 4 bytes for key and 1 byte for value.
runner.assertNotValid();
runner.enqueue("1\n2\n3\n4\n".getBytes());
runner.run(2);
runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0).assertContentEquals("1\n2\n");
runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0).assertContentEquals("3\n4\n");
}
@Test
public void testProducerConfigAsyncDefaultEnqueueTimeout() {
final TestableProcessor processor = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
// Do not set QUEUE_ENQUEUE_TIMEOUT
final ProcessContext context = runner.getProcessContext();
final ProducerConfig config = processor.createConfig(context);
// Check that the enqueue timeout defaults to -1
assertEquals(-1, config.queueEnqueueTimeoutMs());
// Check the producer type
final String actualProducerType = config.producerType();
assertEquals("async", actualProducerType);
}
/**
* Used to override the {@link #getProducer()} method so that we can enforce that our MockProducer is used
*/
private static class TestableProcessor extends PutKafka {
private MockProducer producer;
private int failAfter = Integer.MAX_VALUE;
private final MockProducer producer;
public TestableProcessor() {
this(null);
}
public TestableProcessor(final int failAfter) {
this.failAfter = failAfter;
public TestableProcessor(final Integer failAfter) {
this(failAfter, null);
}
@OnScheduled
public void instantiateProducer(final ProcessContext context) {
producer = new MockProducer(createConfig(context));
public TestableProcessor(final Integer failAfter, final Integer stopFailingAfter) {
producer = new MockProducer();
producer.setFailAfter(failAfter);
producer.setStopFailingAfter(stopFailingAfter);
}
@Override
protected Producer<byte[], byte[]> createProducer(final ProcessContext context) {
protected Producer<byte[], byte[]> getProducer() {
return producer;
}
public MockProducer getProducer() {
return producer;
}
/**
* Exposed for test verification
*/
@Override
public ProducerConfig createConfig(final ProcessContext context) {
return super.createConfig(context);
public void setMaxQueueSize(final long bytes) {
producer.setMaxQueueSize(bytes);
}
}
private static class MockProducer extends Producer<byte[], byte[]> {
/**
* We have our own Mock Producer, which is very similar to the Kafka-supplied one. However, with the Kafka-supplied
* Producer, we don't have the ability to tell it to fail after X number of messages; rather, we can only tell it
* to fail on the next message. Since we are sending multiple messages in a single onTrigger call for the Processor,
* this doesn't allow us to test failure conditions adequately.
*/
private static class MockProducer implements Producer<byte[], byte[]> {
private int sendCount = 0;
private int failAfter = Integer.MAX_VALUE;
private Integer failAfter;
private Integer stopFailingAfter;
private long queueSize = 0L;
private long maxQueueSize = Long.MAX_VALUE;
private final List<byte[]> messages = new ArrayList<>();
private final List<ProducerRecord<byte[], byte[]>> messages = new ArrayList<>();
public MockProducer(final ProducerConfig config) {
super(config);
public MockProducer() {
}
@Override
public void send(final KeyedMessage<byte[], byte[]> message) {
if (++sendCount > failAfter) {
throw new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages"));
} else {
messages.add(message.message());
}
public void setMaxQueueSize(final long bytes) {
this.maxQueueSize = bytes;
}
public List<byte[]> getMessages() {
public List<ProducerRecord<byte[], byte[]>> getMessages() {
return messages;
}
@Override
public void send(final List<KeyedMessage<byte[], byte[]>> messages) {
for (final KeyedMessage<byte[], byte[]> msg : messages) {
send(msg);
}
public void setFailAfter(final Integer successCount) {
failAfter = successCount;
}
public void setFailAfter(final int successCount) {
failAfter = successCount;
public void setStopFailingAfter(final Integer stopFailingAfter) {
this.stopFailingAfter = stopFailingAfter;
}
@Override
public Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record) {
return send(record, null);
}
@Override
public Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
sendCount++;
final ByteArraySerializer serializer = new ByteArraySerializer();
final int keyBytes = serializer.serialize(record.topic(), record.key()).length;
final int valueBytes = serializer.serialize(record.topic(), record.value()).length;
if (maxQueueSize - queueSize < keyBytes + valueBytes) {
throw new BufferExhaustedException("Queue size is " + queueSize + " but serialized message is " + (keyBytes + valueBytes));
}
queueSize += keyBytes + valueBytes;
if (failAfter != null && sendCount > failAfter && ((stopFailingAfter == null) || (sendCount < stopFailingAfter + 1))) {
final Exception e = new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages"));
callback.onCompletion(null, e);
} else {
messages.add(record);
final RecordMetadata meta = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 1 : record.partition()), 0L, 0L);
callback.onCompletion(meta, null);
}
// we don't actually look at the Future in the processor, so we can just return null
return null;
}
@Override
public List<PartitionInfo> partitionsFor(String topic) {
final Node leader = new Node(1, "localhost", 1111);
final Node node2 = new Node(2, "localhost-2", 2222);
final Node node3 = new Node(3, "localhost-3", 3333);
final PartitionInfo partInfo1 = new PartitionInfo(topic, 1, leader, new Node[] {node2, node3}, new Node[0]);
final PartitionInfo partInfo2 = new PartitionInfo(topic, 2, leader, new Node[] {node2, node3}, new Node[0]);
final PartitionInfo partInfo3 = new PartitionInfo(topic, 3, leader, new Node[] {node2, node3}, new Node[0]);
final List<PartitionInfo> infos = new ArrayList<>(3);
infos.add(partInfo1);
infos.add(partInfo2);
infos.add(partInfo3);
return infos;
}
@Override
public Map<MetricName, ? extends Metric> metrics() {
return Collections.emptyMap();
}
@Override
public void close() {
}
}

View File

@ -168,8 +168,12 @@ public class ExecuteSQL extends AbstractProcessor {
session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(outgoing, REL_SUCCESS);
} catch (final ProcessException | SQLException e) {
logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[] { selectQuery, incoming, e });
session.transfer(incoming, REL_FAILURE);
if (incoming == null) {
logger.error("Unable to execute SQL select query {} due to {}. No incoming flow file to route to failure", new Object[] {selectQuery, e});
} else {
logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[] {selectQuery, incoming, e});
session.transfer(incoming, REL_FAILURE);
}
}
}
}

View File

@ -114,6 +114,7 @@ public class PutSyslog extends AbstractSyslogProcessor {
"\"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", \" or it can be an RFC3164 timestamp " +
"with a format of \"MMM d HH:mm:ss\".")
.required(true)
.defaultValue("${now():format('MMM d HH:mm:ss')}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Map;
public class TestDynamicEnvironment {
public static void main(String[] args) {
// iterate through current environment and print out all properties starting with NIFI
for (Map.Entry<String, String> env: System.getenv().entrySet()) {
if (env.getKey().startsWith("NIFI")) {
System.out.println(env.getKey() + "=" + env.getValue());
}
}
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
public class TestIngestAndUpdate {
public static void main(String[] args) throws IOException {
byte[] bytes = new byte[1024];
System.out.write(System.getProperty("user.dir").getBytes());
System.out.println(":ModifiedResult");
int numRead = 0;
while ((numRead = System.in.read(bytes)) != -1) {
System.out.write(bytes, 0, numRead);
}
System.out.flush();
System.out.close();
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class TestSuccess {
public static void main(String[] args) {
System.out.println("Test was a success");
}
}