diff --git a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java index 973ea40a07..1288d212fb 100644 --- a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java +++ b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java @@ -31,7 +31,12 @@ public interface FlowFile extends Comparable { /** * @return the unique identifier for this flow file + * @deprecated This method has been deprecated in favor of using the attribute + * {@link org.apache.nifi.flowfile.attributes.CoreAttributes.UUID CoreAttributes.UUID}. + * If an identifier is needed use {@link #getAttribute(String)} to retrieve the value for this attribute. + * For example, by calling getAttribute(CoreAttributes.UUID.getKey()). */ + @Deprecated long getId(); /** diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java index b8790417dc..158caa1d67 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java @@ -43,21 +43,18 @@ import com.couchbase.client.java.Bucket; */ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { - public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor - .Builder().name("Document Type") - .description("The type of contents.") - .required(true) - .allowableValues(DocumentType.values()) - .defaultValue(DocumentType.Json.toString()) - .build(); + public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor.Builder().name("Document Type") + .description("The type of contents.") + .required(true) + .allowableValues(DocumentType.values()) + .defaultValue(DocumentType.Json.toString()) + .build(); - public static final PropertyDescriptor DOC_ID = new PropertyDescriptor - .Builder().name("Document Id") - .description("A static, fixed Couchbase document id." - + "Or an expression to construct the Couchbase document id.") - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + public static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder().name("Document Id") + .description("A static, fixed Couchbase document id, or an expression to construct the Couchbase document id.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -77,15 +74,13 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { .description("All FlowFiles that cannot written to Couchbase Server and can't be retried are routed to this relationship.") .build(); - public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor - .Builder().name("Couchbase Cluster Controller Service") + public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor.Builder().name("Couchbase Cluster Controller Service") .description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.") .required(true) .identifiesControllerService(CouchbaseClusterControllerService.class) .build(); - public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor - .Builder().name("Bucket Name") + public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder().name("Bucket Name") .description("The name of bucket to access.") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -115,6 +110,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { /** * Add processor specific properties. + * * @param descriptors add properties to this list */ protected void addSupportedProperties(List descriptors) { @@ -123,6 +119,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { /** * Add processor specific relationships. + * * @param relationships add relationships to this list */ protected void addSupportedRelationships(Set relationships) { @@ -140,11 +137,11 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { } private CouchbaseClusterControllerService getClusterService(final ProcessContext context) { - if(clusterService == null){ - synchronized(AbstractCouchbaseProcessor.class){ - if(clusterService == null){ + if (clusterService == null) { + synchronized (AbstractCouchbaseProcessor.class) { + if (clusterService == null) { clusterService = context.getProperty(COUCHBASE_CLUSTER_SERVICE) - .asControllerService(CouchbaseClusterControllerService.class); + .asControllerService(CouchbaseClusterControllerService.class); } } } @@ -154,6 +151,7 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { /** * Open a bucket connection using a CouchbaseClusterControllerService. + * * @param context a process context * @return a bucket instance */ @@ -163,18 +161,17 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { /** * Generate a transit url. + * * @param context a process context * @return a transit url based on the bucket name and the CouchbaseClusterControllerService name */ - protected String getTransitUrl(final ProcessContext context) { - return new StringBuilder(context.getProperty(BUCKET_NAME).getValue()) - .append('@') - .append(context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue()) - .toString(); + protected String getTransitUrl(final ProcessContext context, final String docId) { + return "couchbase://" + context.getProperty(BUCKET_NAME).getValue() + "/" + docId; } /** - * Handles the thrown CocuhbaseException accordingly. + * Handles the thrown CouchbaseException accordingly. + * * @param context a process context * @param session a process session * @param logger a logger @@ -183,35 +180,39 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { * @param errMsg a message to be logged */ protected void handleCouchbaseException(final ProcessContext context, final ProcessSession session, - final ProcessorLog logger, FlowFile inFile, CouchbaseException e, - String errMsg) { + final ProcessorLog logger, FlowFile inFile, CouchbaseException e, + String errMsg) { logger.error(errMsg, e); - if(inFile != null){ + if (inFile != null) { ErrorHandlingStrategy strategy = CouchbaseExceptionMappings.getStrategy(e); - switch(strategy.penalty()) { - case Penalize: - if(logger.isDebugEnabled()) logger.debug("Penalized: {}", new Object[]{inFile}); - inFile = session.penalize(inFile); - break; - case Yield: - if(logger.isDebugEnabled()) logger.debug("Yielded context: {}", new Object[]{inFile}); - context.yield(); - break; - case None: - break; + switch (strategy.penalty()) { + case Penalize: + if (logger.isDebugEnabled()) { + logger.debug("Penalized: {}", new Object[] {inFile}); + } + inFile = session.penalize(inFile); + break; + case Yield: + if (logger.isDebugEnabled()) { + logger.debug("Yielded context: {}", new Object[] {inFile}); + } + context.yield(); + break; + case None: + break; } - switch(strategy.result()) { - case ProcessException: - throw new ProcessException(errMsg, e); - case Failure: - inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName()); - session.transfer(inFile, REL_FAILURE); - break; - case Retry: - inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName()); - session.transfer(inFile, REL_RETRY); - break; + switch (strategy.result()) { + case ProcessException: + throw new ProcessException(errMsg, e); + case Failure: + inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName()); + session.transfer(inFile, REL_FAILURE); + break; + case Retry: + inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName()); + session.transfer(inFile, REL_RETRY); + break; } } } diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java index 87ffabb953..e4faba3a2c 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java @@ -78,7 +78,7 @@ public class CouchbaseExceptionMappings { mapping.put(ReplicaNotConfiguredException.class, ConfigurationError); // when a particular Service(KV, View, Query, DCP) isn't running in a cluster mapping.put(ServiceNotAvailableException.class, ConfigurationError); - // SSL configuration error, such as key store mis configuration. + // SSL configuration error, such as key store misconfiguration. mapping.put(SSLException.class, ConfigurationError); /* diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java index 4aa96777cc..b4ff467bdd 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java @@ -16,18 +16,19 @@ */ package org.apache.nifi.processors.couchbase; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.annotation.behavior.ReadsAttribute; -import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -43,6 +44,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.stream.io.StreamUtils; import com.couchbase.client.core.CouchbaseException; @@ -52,31 +54,30 @@ import com.couchbase.client.java.document.Document; import com.couchbase.client.java.document.RawJsonDocument; import com.couchbase.client.java.error.DocumentDoesNotExistException; -@Tags({ "nosql", "couchbase", "database", "get" }) -@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. This processor can be triggered by an incoming FlowFile, or it can be scheduled on a timer") +@Tags({"nosql", "couchbase", "database", "get"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. The ID of the document to fetch may be supplied by setting the property. " + + "NOTE: if the Document Id property is not set, the contents of the FlowFile will be read to determine the Document Id, which means that the contents of the entire " + + "FlowFile will be buffered in memory.") @SeeAlso({CouchbaseClusterControllerService.class}) -@ReadsAttributes({ - @ReadsAttribute(attribute = "FlowFile content", description = "Used as a document id if 'Document Id' is not specified"), - @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.") - }) @WritesAttributes({ - @WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was retrieved from."), - @WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was retrieved from."), - @WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."), - @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."), - @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."), - @WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.") - }) + @WritesAttribute(attribute = "couchbase.cluster", description = "Cluster where the document was retrieved from."), + @WritesAttribute(attribute = "couchbase.bucket", description = "Bucket where the document was retrieved from."), + @WritesAttribute(attribute = "couchbase.doc.id", description = "Id of the document."), + @WritesAttribute(attribute = "couchbase.doc.cas", description = "CAS of the document."), + @WritesAttribute(attribute = "couchbase.doc.expiry", description = "Expiration of the document."), + @WritesAttribute(attribute = "couchbase.exception", description = "If Couchbase related error occurs the CouchbaseException class name will be captured here.") +}) public class GetCouchbaseKey extends AbstractCouchbaseProcessor { @Override - protected void addSupportedProperties(List descriptors) { + protected void addSupportedProperties(final List descriptors) { descriptors.add(DOCUMENT_TYPE); descriptors.add(DOC_ID); } @Override - protected void addSupportedRelationships(Set relationships) { + protected void addSupportedRelationships(final Set relationships) { relationships.add(REL_SUCCESS); relationships.add(REL_ORIGINAL); relationships.add(REL_RETRY); @@ -85,11 +86,15 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final ProcessorLog logger = getLogger(); FlowFile inFile = session.get(); + if (inFile == null) { + return; + } + final long startNanos = System.nanoTime(); + final ProcessorLog logger = getLogger(); String docId = null; - if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ + if (!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())) { docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue(); } else { final byte[] content = new byte[(int) inFile.getSize()]; @@ -102,63 +107,74 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor { docId = new String(content, StandardCharsets.UTF_8); } - if(StringUtils.isEmpty(docId)){ - if(inFile != null){ - throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile); - } + if (StringUtils.isEmpty(docId)) { + throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile); } try { - Document doc = null; - byte[] content = null; - Bucket bucket = openBucket(context); - DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); - switch (documentType){ - case Json : { + final Document doc; + final byte[] content; + final Bucket bucket = openBucket(context); + final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); + + switch (documentType) { + case Json: { RawJsonDocument document = bucket.get(docId, RawJsonDocument.class); - if(document != null){ + if (document == null) { + doc = null; + content = null; + } else { content = document.content().getBytes(StandardCharsets.UTF_8); doc = document; } break; } - case Binary : { + case Binary: { BinaryDocument document = bucket.get(docId, BinaryDocument.class); - if(document != null){ + if (document == null) { + doc = null; + content = null; + } else { content = document.content().array(); doc = document; } break; } + default: { + doc = null; + content = null; + } } - if(doc == null) { - logger.warn("Document {} was not found in {}", new Object[]{docId, getTransitUrl(context)}); - if(inFile != null){ - inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentDoesNotExistException.class.getName()); - session.transfer(inFile, REL_FAILURE); - } + if (doc == null) { + logger.error("Document {} was not found in {}; routing {} to failure", new Object[] {docId, getTransitUrl(context, docId), inFile}); + inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentDoesNotExistException.class.getName()); + session.transfer(inFile, REL_FAILURE); return; } - if(inFile != null){ - session.transfer(inFile, REL_ORIGINAL); - } + FlowFile outFile = session.create(inFile); + outFile = session.write(outFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(content); + } + }); - FlowFile outFile = session.create(); - outFile = session.importFrom(new ByteArrayInputStream(content), outFile); - Map updatedAttrs = new HashMap<>(); + final Map updatedAttrs = new HashMap<>(); updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue()); updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue()); updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId); updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas())); updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry())); outFile = session.putAllAttributes(outFile, updatedAttrs); - session.getProvenanceReporter().receive(outFile, getTransitUrl(context)); - session.transfer(outFile, REL_SUCCESS); - } catch (CouchbaseException e){ - String errMsg = String.format("Getting docuement %s from Couchbase Server using %s failed due to %s", docId, inFile, e); + final long fetchMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().fetch(outFile, getTransitUrl(context, docId), fetchMillis); + session.transfer(outFile, REL_SUCCESS); + session.transfer(inFile, REL_ORIGINAL); + } catch (final CouchbaseException e) { + String errMsg = String.format("Getting document %s from Couchbase Server using %s failed due to %s", docId, inFile, e); handleCouchbaseException(context, session, logger, inFile, e, errMsg); } } diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java index 2aa803c0eb..291c02cb7e 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java @@ -54,39 +54,36 @@ import com.couchbase.client.java.document.BinaryDocument; import com.couchbase.client.java.document.Document; import com.couchbase.client.java.document.RawJsonDocument; -@Tags({ "nosql", "couchbase", "database", "put" }) +@Tags({"nosql", "couchbase", "database", "put"}) @CapabilityDescription("Put a document to Couchbase Server via Key/Value access.") @SeeAlso({CouchbaseClusterControllerService.class}) @ReadsAttributes({ @ReadsAttribute(attribute = "uuid", description = "Used as a document id if 'Document Id' is not specified"), - @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.") - }) +}) @WritesAttributes({ - @WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was stored."), - @WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was stored."), - @WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."), - @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."), - @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."), - @WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.") - }) + @WritesAttribute(attribute = "couchbase.cluster", description = "Cluster where the document was stored."), + @WritesAttribute(attribute = "couchbase.bucket", description = "Bucket where the document was stored."), + @WritesAttribute(attribute = "couchbase.doc.id", description = "Id of the document."), + @WritesAttribute(attribute = "couchbase.doc.cas", description = "CAS of the document."), + @WritesAttribute(attribute = "couchbase.doc.expiry", description = "Expiration of the document."), + @WritesAttribute(attribute = "couchbase.exception", description = "If Couchbase related error occurs the CouchbaseException class name will be captured here.") +}) public class PutCouchbaseKey extends AbstractCouchbaseProcessor { - public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor - .Builder().name("Persist To") - .description("Durability constraint about disk persistence.") - .required(true) - .allowableValues(PersistTo.values()) - .defaultValue(PersistTo.NONE.toString()) - .build(); + public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor.Builder().name("Persist To") + .description("Durability constraint about disk persistence.") + .required(true) + .allowableValues(PersistTo.values()) + .defaultValue(PersistTo.NONE.toString()) + .build(); - public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor - .Builder().name("Replicate To") - .description("Durability constraint about replication.") - .required(true) - .allowableValues(ReplicateTo.values()) - .defaultValue(ReplicateTo.NONE.toString()) - .build(); + public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor.Builder().name("Replicate To") + .description("Durability constraint about replication.") + .required(true) + .allowableValues(ReplicateTo.values()) + .defaultValue(ReplicateTo.NONE.toString()) + .build(); @Override protected void addSupportedProperties(List descriptors) { @@ -107,7 +104,7 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final ProcessorLog logger = getLogger(); FlowFile flowFile = session.get(); - if ( flowFile == null ) { + if (flowFile == null) { return; } @@ -119,41 +116,42 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor { } }); - String docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key())); - if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ + String docId = flowFile.getAttribute(CoreAttributes.UUID.key()); + if (!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())) { docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue(); } try { Document doc = null; - DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); - switch (documentType){ - case Json : { + final DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); + switch (documentType) { + case Json: { doc = RawJsonDocument.create(docId, new String(content, StandardCharsets.UTF_8)); break; } - case Binary : { - ByteBuf buf = Unpooled.copiedBuffer(content); + case Binary: { + final ByteBuf buf = Unpooled.copiedBuffer(content); doc = BinaryDocument.create(docId, buf); break; } } - PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue()); - ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue()); + final PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue()); + final ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue()); doc = openBucket(context).upsert(doc, persistTo, replicateTo); - Map updatedAttrs = new HashMap<>(); + + final Map updatedAttrs = new HashMap<>(); updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue()); updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue()); updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId); updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas())); updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry())); - flowFile = session.putAllAttributes(flowFile, updatedAttrs); - session.getProvenanceReporter().send(flowFile, getTransitUrl(context)); - session.transfer(flowFile, REL_SUCCESS); - } catch (CouchbaseException e) { - String errMsg = String.format("Writing docuement %s to Couchbase Server using %s failed due to %s", docId, flowFile, e); + flowFile = session.putAllAttributes(flowFile, updatedAttrs); + session.getProvenanceReporter().send(flowFile, getTransitUrl(context, docId)); + session.transfer(flowFile, REL_SUCCESS); + } catch (final CouchbaseException e) { + String errMsg = String.format("Writing document %s to Couchbase Server using %s failed due to %s", docId, flowFile, e); handleCouchbaseException(context, session, logger, flowFile, e, errMsg); } } diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java index 108980c35a..37768261a7 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java @@ -100,10 +100,11 @@ public class TestGetCouchbaseKey { testRunner.setProperty(BUCKET_NAME, bucketName); testRunner.setProperty(DOC_ID, docId); + testRunner.enqueue(new byte[0]); testRunner.run(); - testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_ORIGINAL, 1); testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_FAILURE, 0); MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); @@ -145,7 +146,7 @@ public class TestGetCouchbaseKey { } @Test - public void testDocIdExpWithNullFlowFile() throws Exception { + public void testDocIdExpWithEmptyFlowFile() throws Exception { String docIdExp = "doc-s"; String docId = "doc-s"; @@ -157,10 +158,11 @@ public class TestGetCouchbaseKey { testRunner.setProperty(DOC_ID, docIdExp); + testRunner.enqueue(new byte[0]); testRunner.run(); testRunner.assertTransferCount(REL_SUCCESS, 1); - testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 1); testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_FAILURE, 0); MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); @@ -179,11 +181,12 @@ public class TestGetCouchbaseKey { setupMockBucket(bucket); testRunner.setProperty(DOC_ID, docIdExp); + testRunner.enqueue(new byte[0]); try { testRunner.run(); fail("Exception should be thrown."); - } catch (AssertionError e){ + } catch (AssertionError e) { Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class)); } @@ -210,7 +213,7 @@ public class TestGetCouchbaseKey { try { testRunner.run(); fail("Exception should be thrown."); - } catch (AssertionError e){ + } catch (AssertionError e) { Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class)); } @@ -288,7 +291,7 @@ public class TestGetCouchbaseKey { try { testRunner.run(); fail("ProcessException should be thrown."); - } catch (AssertionError e){ + } catch (AssertionError e) { Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); } @@ -315,7 +318,7 @@ public class TestGetCouchbaseKey { try { testRunner.run(); fail("ProcessException should be thrown."); - } catch (AssertionError e){ + } catch (AssertionError e) { Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); Assert.assertTrue(e.getCause().getCause().getClass().equals(AuthenticationException.class)); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index ae991c806e..9a439c9829 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -32,8 +32,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -80,19 +78,19 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class); private PriorityQueue activeQueue = null; + + // guarded by lock private ArrayList swapQueue = null; private final AtomicReference size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L)); private boolean swapMode = false; - private volatile String maximumQueueDataSize; - private volatile long maximumQueueByteCount; - private volatile long maximumQueueObjectCount; + + private final AtomicReference maxQueueSize = new AtomicReference<>(new MaxQueueSize("0 MB", 0L, 0L)); + private final AtomicReference expirationPeriod = new AtomicReference<>(new TimePeriod("0 mins", 0L)); private final EventReporter eventReporter; - private final AtomicLong flowFileExpirationMillis; private final Connection connection; - private final AtomicReference flowFileExpirationPeriod; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); private final List priorities; private final int swapThreshold; @@ -106,8 +104,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final ProvenanceEventRepository provRepository; private final ResourceClaimManager resourceClaimManager; - private final AtomicBoolean queueFullRef = new AtomicBoolean(false); - // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK! private final ProcessScheduler scheduler; @@ -115,11 +111,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) { activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList())); priorities = new ArrayList<>(); - maximumQueueObjectCount = 0L; - maximumQueueDataSize = "0 MB"; - maximumQueueByteCount = 0L; - flowFileExpirationMillis = new AtomicLong(0); - flowFileExpirationPeriod = new AtomicReference<>("0 mins"); swapQueue = new ArrayList<>(); this.eventReporter = eventReporter; this.swapManager = swapManager; @@ -161,36 +152,35 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } @Override - public void setBackPressureObjectThreshold(final long maxQueueSize) { - writeLock.lock(); - try { - maximumQueueObjectCount = maxQueueSize; - this.queueFullRef.set(determineIfFull()); - } finally { - writeLock.unlock("setBackPressureObjectThreshold"); + public void setBackPressureObjectThreshold(final long threshold) { + boolean updated = false; + while (!updated) { + MaxQueueSize maxSize = maxQueueSize.get(); + final MaxQueueSize updatedSize = new MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold); + updated = maxQueueSize.compareAndSet(maxSize, updatedSize); } } @Override public long getBackPressureObjectThreshold() { - return maximumQueueObjectCount; + return maxQueueSize.get().getMaxCount(); } @Override public void setBackPressureDataSizeThreshold(final String maxDataSize) { - writeLock.lock(); - try { - maximumQueueByteCount = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue(); - maximumQueueDataSize = maxDataSize; - this.queueFullRef.set(determineIfFull()); - } finally { - writeLock.unlock("setBackPressureDataSizeThreshold"); + final long maxBytes = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue(); + + boolean updated = false; + while (!updated) { + MaxQueueSize maxSize = maxQueueSize.get(); + final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize, maxBytes, maxSize.getMaxCount()); + updated = maxQueueSize.compareAndSet(maxSize, updatedSize); } } @Override public String getBackPressureDataSizeThreshold() { - return maximumQueueDataSize; + return maxQueueSize.get().getMaxSize(); } @Override @@ -220,17 +210,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public void acknowledge(final FlowFileRecord flowFile) { - if (queueFullRef.get()) { - writeLock.lock(); - try { - incrementUnacknowledgedQueueSize(-1, -flowFile.getSize()); - queueFullRef.set(determineIfFull()); - } finally { - writeLock.unlock("acknowledge(FlowFileRecord)"); - } - } else { - incrementUnacknowledgedQueueSize(-1, -flowFile.getSize()); - } + incrementUnacknowledgedQueueSize(-1, -flowFile.getSize()); if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { // queue was full but no longer is. Notify that the source may now be available to run, @@ -246,17 +226,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { totalSize += flowFile.getSize(); } - if (queueFullRef.get()) { - writeLock.lock(); - try { - incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize); - queueFullRef.set(determineIfFull()); - } finally { - writeLock.unlock("acknowledge(FlowFileRecord)"); - } - } else { - incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize); - } + incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize); if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { // it's possible that queue was full but no longer is. Notify that the source may now be available to run, @@ -267,33 +237,26 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public boolean isFull() { - return queueFullRef.get(); - } + final MaxQueueSize maxSize = maxQueueSize.get(); - /** - * MUST be called with either the read or write lock held - * - * @return true if full - */ - private boolean determineIfFull() { - final long maxSize = maximumQueueObjectCount; - final long maxBytes = maximumQueueByteCount; - if (maxSize <= 0 && maxBytes <= 0) { + // Check if max size is set + if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) { return false; } final QueueSize queueSize = getQueueSize(); - if (maxSize > 0 && queueSize.getObjectCount() >= maxSize) { + if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) { return true; } - if (maxBytes > 0 && queueSize.getByteCount() >= maxBytes) { + if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes()) { return true; } return false; } + @Override public void put(final FlowFileRecord file) { writeLock.lock(); @@ -307,8 +270,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { incrementActiveQueueSize(1, file.getSize()); activeQueue.add(file); } - - queueFullRef.set(determineIfFull()); } finally { writeLock.unlock("put(FlowFileRecord)"); } @@ -337,8 +298,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { incrementActiveQueueSize(numFiles, bytes); activeQueue.addAll(files); } - - queueFullRef.set(determineIfFull()); } finally { writeLock.unlock("putAll"); } @@ -374,7 +333,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { FlowFileRecord flowFile = null; // First check if we have any records Pre-Fetched. - final long expirationMillis = flowFileExpirationMillis.get(); + final long expirationMillis = expirationPeriod.get().getMillis(); writeLock.lock(); try { flowFile = doPoll(expiredRecords, expirationMillis); @@ -393,10 +352,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { boolean isExpired; migrateSwapToActive(); - final boolean queueFullAtStart = queueFullRef.get(); long expiredBytes = 0L; - do { flowFile = this.activeQueue.poll(); @@ -424,13 +381,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes); } - // if at least 1 FlowFile was expired & the queue was full before we started, then - // we need to determine whether or not the queue is full again. If no FlowFile was expired, - // then the queue will still be full until the appropriate #acknowledge method is called. - if (queueFullAtStart && !expiredRecords.isEmpty()) { - queueFullRef.set(determineIfFull()); - } - return flowFile; } @@ -451,8 +401,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private void doPoll(final List records, int maxResults, final Set expiredRecords) { migrateSwapToActive(); - final boolean queueFullAtStart = queueFullRef.get(); - final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords); long expiredBytes = 0L; @@ -462,13 +410,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained); incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes); - - // if at least 1 FlowFile was expired & the queue was full before we started, then - // we need to determine whether or not the queue is full again. If no FlowFile was expired, - // then the queue will still be full until the appropriate #acknowledge method is called. - if (queueFullAtStart && !expiredRecords.isEmpty()) { - queueFullRef.set(determineIfFull()); - } } /** @@ -660,7 +601,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { long drainedSize = 0L; FlowFileRecord pulled = null; - final long expirationMillis = this.flowFileExpirationMillis.get(); + final long expirationMillis = expirationPeriod.get().getMillis(); while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) { if (isLaterThan(getExpirationDate(pulled, expirationMillis))) { expiredRecords.add(pulled); @@ -688,8 +629,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { try { migrateSwapToActive(); - final long expirationMillis = this.flowFileExpirationMillis.get(); - final boolean queueFullAtStart = queueFullRef.get(); + final long expirationMillis = expirationPeriod.get().getMillis(); final List selectedFlowFiles = new ArrayList<>(); final List unselected = new ArrayList<>(); @@ -734,17 +674,10 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } this.activeQueue.addAll(unselected); - - // if at least 1 FlowFile was expired & the queue was full before we started, then - // we need to determine whether or not the queue is full again. If no FlowFile was expired, - // then the queue will still be full until the appropriate #acknowledge method is called. - if (queueFullAtStart && !expiredRecords.isEmpty()) { - queueFullRef.set(determineIfFull()); - } + incrementActiveQueueSize(-flowFilesPulled, -bytesPulled); return selectedFlowFiles; } finally { - incrementActiveQueueSize(-flowFilesPulled, -bytesPulled); writeLock.unlock("poll(Filter, Set)"); } } @@ -817,12 +750,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public String getFlowFileExpiration() { - return flowFileExpirationPeriod.get(); + return expirationPeriod.get().getPeriod(); } @Override public int getFlowFileExpiration(final TimeUnit timeUnit) { - return (int) timeUnit.convert(flowFileExpirationMillis.get(), TimeUnit.MILLISECONDS); + return (int) timeUnit.convert(expirationPeriod.get().getMillis(), TimeUnit.MILLISECONDS); } @Override @@ -831,8 +764,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { if (millis < 0) { throw new IllegalArgumentException("FlowFile Expiration Period must be positive"); } - this.flowFileExpirationPeriod.set(flowExpirationPeriod); - this.flowFileExpirationMillis.set(millis); + + expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis)); } @@ -1287,4 +1220,57 @@ public final class StandardFlowFileQueue implements FlowFileQueue { " Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]"; } } + + + private static class MaxQueueSize { + private final String maxSize; + private final long maxBytes; + private final long maxCount; + + public MaxQueueSize(final String maxSize, final long maxBytes, final long maxCount) { + this.maxSize = maxSize; + this.maxBytes = maxBytes; + this.maxCount = maxCount; + } + + public String getMaxSize() { + return maxSize; + } + + public long getMaxBytes() { + return maxBytes; + } + + public long getMaxCount() { + return maxCount; + } + + @Override + public String toString() { + return maxCount + " Objects/" + maxSize; + } + } + + private static class TimePeriod { + private final String period; + private final long millis; + + public TimePeriod(final String period, final long millis) { + this.period = period; + this.millis = millis; + } + + public String getPeriod() { + return period; + } + + public long getMillis() { + return millis; + } + + @Override + public String toString() { + return period; + } + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index 8b8c678672..09ac7f2a27 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -48,18 +49,28 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestStandardFlowFileQueue { private TestSwapManager swapManager = null; private StandardFlowFileQueue queue = null; + private List provRecords = new ArrayList<>(); + @Before + @SuppressWarnings("unchecked") public void setup() { + provRecords.clear(); + final Connection connection = Mockito.mock(Connection.class); Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class)); Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class)); @@ -72,6 +83,16 @@ public class TestStandardFlowFileQueue { final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class); Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder()); + Mockito.doAnswer(new Answer() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + final Iterable iterable = (Iterable) invocation.getArguments()[0]; + for (final ProvenanceEventRecord record : iterable) { + provRecords.add(record); + } + return null; + } + }).when(provRepo).registerEvents(Mockito.any(Iterable.class)); queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000); TestFlowFile.idGenerator.set(0L); @@ -106,6 +127,160 @@ public class TestStandardFlowFileQueue { assertEquals(0L, unackSize.getByteCount()); } + @Test + public void testBackPressure() { + queue.setBackPressureObjectThreshold(10); + + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + assertFalse(queue.isFull()); + + for (int i = 0; i < 9; i++) { + queue.put(new TestFlowFile()); + assertFalse(queue.isFull()); + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + } + + queue.put(new TestFlowFile()); + assertTrue(queue.isFull()); + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + + final Set expiredRecords = new HashSet<>(); + final FlowFileRecord polled = queue.poll(expiredRecords); + assertNotNull(polled); + assertTrue(expiredRecords.isEmpty()); + + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + + // queue is still full because FlowFile has not yet been acknowledged. + assertTrue(queue.isFull()); + queue.acknowledge(polled); + + // FlowFile has been acknowledged; queue should no longer be full. + assertFalse(queue.isFull()); + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + } + + @Test + public void testBackPressureAfterPollFilter() throws InterruptedException { + queue.setBackPressureObjectThreshold(10); + queue.setFlowFileExpiration("10 millis"); + + for (int i = 0; i < 9; i++) { + queue.put(new TestFlowFile()); + assertFalse(queue.isFull()); + } + + queue.put(new TestFlowFile()); + assertTrue(queue.isFull()); + + Thread.sleep(100L); + + + final FlowFileFilter filter = new FlowFileFilter() { + @Override + public FlowFileFilterResult filter(final FlowFile flowFile) { + return FlowFileFilterResult.REJECT_AND_CONTINUE; + } + }; + + final Set expiredRecords = new HashSet<>(); + final List polled = queue.poll(filter, expiredRecords); + assertTrue(polled.isEmpty()); + assertEquals(10, expiredRecords.size()); + + assertFalse(queue.isFull()); + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + } + + @Test(timeout = 10000) + public void testBackPressureAfterDrop() throws InterruptedException { + queue.setBackPressureObjectThreshold(10); + queue.setFlowFileExpiration("10 millis"); + + for (int i = 0; i < 9; i++) { + queue.put(new TestFlowFile()); + assertFalse(queue.isFull()); + } + + queue.put(new TestFlowFile()); + assertTrue(queue.isFull()); + + Thread.sleep(100L); + + final String requestId = UUID.randomUUID().toString(); + final DropFlowFileStatus status = queue.dropFlowFiles(requestId, "Unit Test"); + + while (status.getState() != DropFlowFileState.COMPLETE) { + Thread.sleep(10L); + } + + assertFalse(queue.isFull()); + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + + assertEquals(10, provRecords.size()); + for (final ProvenanceEventRecord event : provRecords) { + assertNotNull(event); + assertEquals(ProvenanceEventType.DROP, event.getEventType()); + } + } + + @Test + public void testBackPressureAfterPollSingle() throws InterruptedException { + queue.setBackPressureObjectThreshold(10); + queue.setFlowFileExpiration("10 millis"); + + for (int i = 0; i < 9; i++) { + queue.put(new TestFlowFile()); + assertFalse(queue.isFull()); + } + + queue.put(new TestFlowFile()); + assertTrue(queue.isFull()); + + Thread.sleep(100L); + + final Set expiredRecords = new HashSet<>(); + final FlowFileRecord polled = queue.poll(expiredRecords); + assertNull(polled); + assertEquals(10, expiredRecords.size()); + + assertFalse(queue.isFull()); + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + } + + @Test + public void testBackPressureAfterPollMultiple() throws InterruptedException { + queue.setBackPressureObjectThreshold(10); + queue.setFlowFileExpiration("10 millis"); + + for (int i = 0; i < 9; i++) { + queue.put(new TestFlowFile()); + assertFalse(queue.isFull()); + } + + queue.put(new TestFlowFile()); + assertTrue(queue.isFull()); + + Thread.sleep(100L); + + final Set expiredRecords = new HashSet<>(); + final List polled = queue.poll(10, expiredRecords); + assertTrue(polled.isEmpty()); + assertEquals(10, expiredRecords.size()); + + assertFalse(queue.isFull()); + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + } + @Test public void testSwapOutOccurs() { for (int i = 0; i < 10000; i++) { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index a67a9fd0fe..9e89c3a1e6 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -133,7 +133,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { .description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).addValidator(KERBEROS_CONFIG_VALIDATOR).build(); - private static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder().name("Kerberos Relogin Period").required(false) + public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder().name("Kerberos Relogin Period").required(false) .description("Period of time which should pass before attempting a kerberos relogin").defaultValue("4 hours") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index c27ade9a7a..6434e5eb02 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -83,6 +83,9 @@ public class FetchHDFS extends AbstractHadoopProcessor { final List properties = new ArrayList<>(); properties.add(HADOOP_CONFIGURATION_RESOURCES); properties.add(FILENAME); + properties.add(KERBEROS_PRINCIPAL); + properties.add(KERBEROS_KEYTAB); + properties.add(KERBEROS_RELOGIN_PERIOD); return properties; } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 563bda8b6d..0fae4ca1e9 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -138,6 +138,9 @@ public class ListHDFS extends AbstractHadoopProcessor { properties.add(DISTRIBUTED_CACHE_SERVICE); properties.add(DIRECTORY); properties.add(RECURSE_SUBDIRS); + properties.add(KERBEROS_PRINCIPAL); + properties.add(KERBEROS_KEYTAB); + properties.add(KERBEROS_RELOGIN_PERIOD); return properties; } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml index c2e86fec07..1a8dc9d048 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml @@ -34,6 +34,11 @@ org.apache.nifi nifi-utils + + org.apache.kafka + kafka-clients + 0.8.2.2 + org.apache.kafka kafka_2.9.1 diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 09025a4c96..ea7f7bbb11 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -21,30 +21,47 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; +import org.apache.kafka.clients.producer.BufferExhaustedException; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; @@ -56,21 +73,20 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; import org.apache.nifi.util.LongHolder; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; import scala.actors.threadpool.Arrays; @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" }) -@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka") -public class PutKafka extends AbstractProcessor { +@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a " + + "user-specified delimiter, such as a new-line.") +@TriggerWhenEmpty // because we have a queue of sessions that are ready to be committed +public class PutKafka extends AbstractSessionFactoryProcessor { private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}"; private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; - public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to" + public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", "FlowFile will be routed to" + " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed" + " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than" @@ -79,16 +95,6 @@ public class PutKafka extends AbstractProcessor { + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result" + " in data loss."); - /** - * AllowableValue for a Producer Type that synchronously sends messages to Kafka - */ - public static final AllowableValue PRODUCTER_TYPE_SYNCHRONOUS = new AllowableValue("sync", "Synchronous", "Send FlowFiles to Kafka immediately."); - - /** - * AllowableValue for a Producer Type that asynchronously sends messages to Kafka - */ - public static final AllowableValue PRODUCTER_TYPE_ASYNCHRONOUS = new AllowableValue("async", "Asynchronous", "Batch messages before sending them to Kafka." - + " While this will improve throughput, it opens the possibility that a failure on the client machine will drop unsent data."); /** * AllowableValue for sending messages to Kafka without compression @@ -105,6 +111,13 @@ public class PutKafka extends AbstractProcessor { */ public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", "Compress messages using Snappy"); + static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue("Round Robin", "Round Robin", + "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, the next Partition to Partition 2, and so on, wrapping as necessary."); + static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("Random Robin", "Random", + "Messages will be assigned to random partitions."); + static final AllowableValue USER_DEFINED_PARTITIONING = new AllowableValue("User-Defined", "User-Defined", + "The property will be used to determine the partition. All messages within the same FlowFile will be assigned to the same partition."); + public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder() .name("Known Brokers") @@ -120,6 +133,21 @@ public class PutKafka extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) .build(); + static final PropertyDescriptor PARTITION_STRATEGY = new PropertyDescriptor.Builder() + .name("Partition Strategy") + .description("Specifies how messages should be partitioned when sent to Kafka") + .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, USER_DEFINED_PARTITIONING) + .defaultValue(ROUND_ROBIN_PARTITIONING.getValue()) + .required(true) + .build(); + public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder() + .name("Partition") + .description("Specifies which Kafka Partition to add the message to. If using a message delimiter, all messages in the same FlowFile will be sent to the same partition. " + + "If a partition is specified but is not valid, then all messages within the same FlowFile will use the same partition but it remains undefined which partition is used.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .build(); public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() .name("Kafka Key") .description("The Key to use for the Message") @@ -140,7 +168,10 @@ public class PutKafka extends AbstractProcessor { .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " + "If not specified, the entire content of the FlowFile will be used as a single message. " + "If specified, the contents of the FlowFile will be split on this delimiter and each section " - + "sent as a separate Kafka message.") + + "sent as a separate Kafka message. Note that if messages are delimited and some messages for a given FlowFile " + + "are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those " + + "messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' " + + "relationship.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) @@ -151,6 +182,13 @@ public class PutKafka extends AbstractProcessor { .required(true) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .expressionLanguageSupported(false) + .defaultValue("5 MB") + .build(); + static final PropertyDescriptor MAX_RECORD_SIZE = new PropertyDescriptor.Builder() + .name("Max Record Size") + .description("The maximum size that any individual record can be.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .required(true) .defaultValue("1 MB") .build(); public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() @@ -168,20 +206,10 @@ public class PutKafka extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(false) .build(); - public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder() - .name("Producer Type") - .description("This parameter specifies whether the messages are sent asynchronously in a background thread.") - .required(true) - .allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, PRODUCTER_TYPE_ASYNCHRONOUS) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue()) - .build(); public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder() .name("Async Batch Size") - .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." - + " The number of messages to send in one batch when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode." - + " The producer will wait until either this number of messages are ready" + .displayName("Batch Size") + .description("The number of messages to send in one batch. The producer will wait until either this number of messages are ready" + " to send or \"Queue Buffering Max Time\" is reached.") .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) @@ -189,35 +217,13 @@ public class PutKafka extends AbstractProcessor { .build(); public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder() .name("Queue Buffering Max Time") - .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." - + " Maximum time to buffer data when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 100 ms" - + " will try to batch together 100ms of messages to send at once. This will improve" + .description("Maximum time to buffer data before sending to Kafka. For example a setting of 100 ms" + + " will try to batch together 100 milliseconds' worth of messages to send at once. This will improve" + " throughput but adds message delivery latency due to the buffering.") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .defaultValue("5 secs") .build(); - public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder() - .name("Queue Buffer Max Count") - .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." - + " The maximum number of unsent messages that can be queued up in the producer when" - + " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode before either the producer must be blocked or data must be dropped.") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("10000") - .build(); - public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new PropertyDescriptor.Builder() - .name("Queue Enqueue Timeout") - .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." - + " The amount of time to block before dropping messages when running in " - + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode" - + " and the buffer has reached the \"Queue Buffer Max Count\". If set to 0, events will" - + " be enqueued immediately or dropped if the queue is full (the producer send call will" - + " never block). If not set, the producer will block indefinitely and never willingly" - + " drop a send.") - .required(false) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() .name("Compression Codec") .description("This parameter allows you to specify the compression codec for all" @@ -227,16 +233,6 @@ public class PutKafka extends AbstractProcessor { .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY) .defaultValue(COMPRESSION_CODEC_NONE.getValue()) .build(); - public static final PropertyDescriptor COMPRESSED_TOPICS = new PropertyDescriptor.Builder() - .name("Compressed Topics") - .description("This parameter allows you to set whether compression should be turned on" - + " for particular topics. If the compression codec is anything other than" - + " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", enable compression only for specified topics if any." - + " If the list of compressed topics is empty, then enable the specified" - + " compression codec for all topics. If the compression codec is " + COMPRESSION_CODEC_NONE.getDisplayName() + "," - + " compression is disabled for all topics") - .required(false) - .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -247,7 +243,13 @@ public class PutKafka extends AbstractProcessor { .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") .build(); - private final BlockingQueue> producers = new LinkedBlockingQueue<>(); + private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); + private final BlockingQueue completeBatches = new LinkedBlockingQueue<>(); + private final Set activeBatches = Collections.synchronizedSet(new HashSet()); + + private final ConcurrentMap partitionIndexMap = new ConcurrentHashMap<>(); + + private volatile Producer producer; @Override protected List getSupportedPropertyDescriptors() { @@ -259,36 +261,21 @@ public class PutKafka extends AbstractProcessor { final List props = new ArrayList<>(); props.add(SEED_BROKERS); props.add(TOPIC); + props.add(PARTITION_STRATEGY); + props.add(PARTITION); props.add(KEY); props.add(DELIVERY_GUARANTEE); props.add(MESSAGE_DELIMITER); props.add(MAX_BUFFER_SIZE); + props.add(MAX_RECORD_SIZE); props.add(TIMEOUT); - props.add(PRODUCER_TYPE); props.add(BATCH_NUM_MESSAGES); - props.add(QUEUE_BUFFERING_MAX_MESSAGES); props.add(QUEUE_BUFFERING_MAX); - props.add(QUEUE_ENQUEUE_TIMEOUT); props.add(COMPRESSION_CODEC); - props.add(COMPRESSED_TOPICS); props.add(clientName); return props; } - @Override - public Collection customValidate(final ValidationContext context) { - final List errors = new ArrayList<>(super.customValidate(context)); - - final Integer batchMessages = context.getProperty(BATCH_NUM_MESSAGES).asInteger(); - final Integer bufferMaxMessages = context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).asInteger(); - - if (batchMessages > bufferMaxMessages) { - errors.add(new ValidationResult.Builder().subject("Batch Size, Queue Buffer").valid(false) - .explanation("Batch Size (" + batchMessages + ") must be equal to or less than the Queue Buffer Max Count (" + bufferMaxMessages + ")").build()); - } - - return errors; - } @Override public Set getRelationships() { @@ -298,71 +285,131 @@ public class PutKafka extends AbstractProcessor { return relationships; } - @OnStopped - public void closeProducers() { - Producer producer; + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final List results = new ArrayList<>(); - while ((producer = producers.poll()) != null) { + final String partitionStrategy = validationContext.getProperty(PARTITION_STRATEGY).getValue(); + if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue()) && !validationContext.getProperty(PARTITION).isSet()) { + results.add(new ValidationResult.Builder().subject("Partition").valid(false).explanation( + "The property must be set when configured to use the User-Defined Partitioning Strategy").build()); + } + + return results; + } + + protected Producer getProducer() { + return producer; + } + + @OnStopped + public void cleanup() { + final Producer producer = getProducer(); + if (producer != null) { producer.close(); } + + for (final FlowFileMessageBatch batch : activeBatches) { + batch.cancelOrComplete(); + } } - protected ProducerConfig createConfig(final ProcessContext context) { + @OnScheduled + public void createProducer(final ProcessContext context) { + producer = new KafkaProducer(createConfig(context), new ByteArraySerializer(), new ByteArraySerializer()); + } + + protected int getActiveMessageBatchCount() { + return activeBatches.size(); + } + + protected int getCompleteMessageBatchCount() { + return completeBatches.size(); + } + + protected Properties createConfig(final ProcessContext context) { final String brokers = context.getProperty(SEED_BROKERS).getValue(); final Properties properties = new Properties(); - properties.setProperty("metadata.broker.list", brokers); - properties.setProperty("request.required.acks", context.getProperty(DELIVERY_GUARANTEE).getValue()); + properties.setProperty("bootstrap.servers", brokers); + properties.setProperty("acks", context.getProperty(DELIVERY_GUARANTEE).getValue()); properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); - properties.setProperty("request.timeout.ms", String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue())); - properties.setProperty("message.send.max.retries", "1"); - properties.setProperty("producer.type", context.getProperty(PRODUCER_TYPE).getValue()); - properties.setProperty("batch.num.messages", context.getProperty(BATCH_NUM_MESSAGES).getValue()); + final String timeout = String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + properties.setProperty("timeout.ms", timeout); + properties.setProperty("metadata.fetch.timeout.ms", timeout); + + properties.setProperty("batch.size", context.getProperty(BATCH_NUM_MESSAGES).getValue()); + properties.setProperty("max.request.size", String.valueOf(context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).longValue())); + + final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue(); + properties.setProperty("buffer.memory", String.valueOf(maxBufferSize)); + + final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue(); + properties.setProperty("compression.type", compressionCodec); final Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS); if (queueBufferingMillis != null) { - properties.setProperty("queue.buffering.max.ms", String.valueOf(queueBufferingMillis)); - } - properties.setProperty("queue.buffering.max.messages", context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue()); - - final Long queueEnqueueTimeoutMillis = context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); - if (queueEnqueueTimeoutMillis != null) { - properties.setProperty("queue.enqueue.timeout.ms", String.valueOf(queueEnqueueTimeoutMillis)); + properties.setProperty("linger.ms", String.valueOf(queueBufferingMillis)); } - final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue(); - properties.setProperty("compression.codec", compressionCodec); + properties.setProperty("retries", "0"); + properties.setProperty("block.on.buffer.full", "false"); - final String compressedTopics = context.getProperty(COMPRESSED_TOPICS).getValue(); - if (compressedTopics != null) { - properties.setProperty("compressed.topics", compressedTopics); + return properties; + } + + private Integer getPartition(final ProcessContext context, final FlowFile flowFile, final String topic) { + final long unnormalizedIndex; + + final String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue(); + if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) { + AtomicLong partitionIndex = partitionIndexMap.get(topic); + if (partitionIndex == null) { + partitionIndex = new AtomicLong(0L); + final AtomicLong existing = partitionIndexMap.putIfAbsent(topic, partitionIndex); + if (existing != null) { + partitionIndex = existing; + } + } + + unnormalizedIndex = partitionIndex.getAndIncrement(); + } else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) { + return null; + } else { + if (context.getProperty(PARTITION).isSet()) { + final String partitionValue = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue(); + + if (NUMBER_PATTERN.matcher(partitionValue).matches()) { + // Subtract 1 because if the partition is "3" then we want to get index 2 into the List of partitions. + unnormalizedIndex = Long.parseLong(partitionValue) - 1; + } else { + unnormalizedIndex = partitionValue.hashCode(); + } + } else { + return null; + } } - return new ProducerConfig(properties); - } - - protected Producer createProducer(final ProcessContext context) { - return new Producer<>(createConfig(context)); - } - - private Producer borrowProducer(final ProcessContext context) { - final Producer producer = producers.poll(); - return producer == null ? createProducer(context) : producer; - } - - private void returnProducer(final Producer producer) { - producers.offer(producer); + final Producer producer = getProducer(); + final List partitionInfos = producer.partitionsFor(topic); + final int partitionIdx = (int) (unnormalizedIndex % partitionInfos.size()); + return partitionInfos.get(partitionIdx).partition(); } @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + FlowFileMessageBatch batch; + while ((batch = completeBatches.poll()) != null) { + batch.completeSession(); + } + + final ProcessSession session = sessionFactory.createSession(); final FlowFile flowFile = session.get(); if (flowFile == null) { return; } - final long start = System.nanoTime(); final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); final byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8); @@ -371,8 +418,7 @@ public class PutKafka extends AbstractProcessor { delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); } - final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue(); - final Producer producer = borrowProducer(context); + final Producer producer = getProducer(); if (delimiter == null) { // Send the entire FlowFile as a single message. @@ -384,31 +430,38 @@ public class PutKafka extends AbstractProcessor { } }); - boolean error = false; + final Integer partition; try { - final KeyedMessage message; - if (key == null) { - message = new KeyedMessage<>(topic, value); - } else { - message = new KeyedMessage<>(topic, keyBytes, value); - } - - producer.send(message); - final long nanos = System.nanoTime() - start; - - session.getProvenanceReporter().send(flowFile, "kafka://" + topic); - session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] { flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) }); + partition = getPartition(context, flowFile, topic); } catch (final Exception e) { - getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] { flowFile, e }); + getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e}); session.transfer(session.penalize(flowFile), REL_FAILURE); - error = true; - } finally { - if (error) { - producer.close(); - } else { - returnProducer(producer); - } + session.commit(); + return; + } + + final ProducerRecord producerRecord = new ProducerRecord<>(topic, partition, keyBytes, value); + + final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic); + messageBatch.setNumMessages(1); + activeBatches.add(messageBatch); + + try { + producer.send(producerRecord, new Callback() { + @Override + public void onCompletion(final RecordMetadata metadata, final Exception exception) { + if (exception == null) { + // record was successfully sent. + messageBatch.addSuccessfulRange(0L, flowFile.getSize(), metadata.offset()); + } else { + messageBatch.addFailedRange(0L, flowFile.getSize(), exception); + } + } + }); + } catch (final BufferExhaustedException bee) { + messageBatch.addFailedRange(0L, flowFile.getSize(), bee); + context.yield(); + return; } } else { final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8); @@ -418,9 +471,9 @@ public class PutKafka extends AbstractProcessor { // the stream of bytes in the FlowFile final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes); - boolean error = false; - final LongHolder lastMessageOffset = new LongHolder(0L); final LongHolder messagesSent = new LongHolder(0L); + final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic); + activeBatches.add(messageBatch); try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { session.read(flowFile, new InputStreamCallback() { @@ -430,13 +483,12 @@ public class PutKafka extends AbstractProcessor { boolean streamFinished = false; - final List> messages = new ArrayList<>(); // batch to send - long messageBytes = 0L; // size of messages in the 'messages' list - int nextByte; try (final InputStream bufferedIn = new BufferedInputStream(rawIn); final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { + long messageStartOffset = in.getBytesConsumed(); + // read until we're out of data. while (!streamFinished) { nextByte = in.read(); @@ -457,107 +509,309 @@ public class PutKafka extends AbstractProcessor { } if (data != null) { + final long messageEndOffset = in.getBytesConsumed(); + // If the message has no data, ignore it. if (data.length != 0) { - // either we ran out of data or we reached the end of the message. - // Either way, create the message because it's ready to send. - final KeyedMessage message; - if (key == null) { - message = new KeyedMessage<>(topic, data); - } else { - message = new KeyedMessage<>(topic, keyBytes, data); + final Integer partition; + try { + partition = getPartition(context, flowFile, topic); + } catch (final Exception e) { + messageBatch.addFailedRange(messageStartOffset, messageEndOffset, e); + getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e}); + continue; } - // Add the message to the list of messages ready to send. If we've reached our - // threshold of how many we're willing to send (or if we're out of data), go ahead - // and send the whole List. - messages.add(message); - messageBytes += data.length; - if (messageBytes >= maxBufferSize || streamFinished) { - // send the messages, then reset our state. - try { - producer.send(messages); - } catch (final Exception e) { - // we wrap the general exception in ProcessException because we want to separate - // failures in sending messages from general Exceptions that would indicate bugs - // in the Processor. Failure to send a message should be handled appropriately, but - // we don't want to catch the general Exception or RuntimeException in order to catch - // failures from Kafka's Producer. - throw new ProcessException("Failed to send messages to Kafka", e); - } - messagesSent.addAndGet(messages.size()); // count number of messages sent + final ProducerRecord producerRecord = new ProducerRecord<>(topic, partition, keyBytes, data); + final long rangeStart = messageStartOffset; - // reset state - messages.clear(); - messageBytes = 0; + try { + producer.send(producerRecord, new Callback() { + @Override + public void onCompletion(final RecordMetadata metadata, final Exception exception) { + if (exception == null) { + // record was successfully sent. + messageBatch.addSuccessfulRange(rangeStart, messageEndOffset, metadata.offset()); + } else { + messageBatch.addFailedRange(rangeStart, messageEndOffset, exception); + } + } + }); - // We've successfully sent a batch of messages. Keep track of the byte offset in the - // FlowFile of the last successfully sent message. This way, if the messages cannot - // all be successfully sent, we know where to split off the data. This allows us to then - // split off the first X number of bytes and send to 'success' and then split off the rest - // and send them to 'failure'. - lastMessageOffset.set(in.getBytesConsumed()); + messagesSent.incrementAndGet(); + } catch (final BufferExhaustedException bee) { + // Not enough room in the buffer. Add from the beginning of this message to end of FlowFile as a failed range + messageBatch.addFailedRange(messageStartOffset, flowFile.getSize(), bee); + context.yield(); + return; } + } + // reset BAOS so that we can start a new message. baos.reset(); data = null; - - } - } - - // If there are messages left, send them - if (!messages.isEmpty()) { - try { - messagesSent.addAndGet(messages.size()); // add count of messages - producer.send(messages); - } catch (final Exception e) { - throw new ProcessException("Failed to send messages to Kafka", e); + messageStartOffset = in.getBytesConsumed(); } } } } }); - final long nanos = System.nanoTime() - start; - session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages"); - session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] { messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) }); - } catch (final ProcessException pe) { - error = true; - - // There was a failure sending messages to Kafka. Iff the lastMessageOffset is 0, then all of them failed and we can - // just route the FlowFile to failure. Otherwise, some messages were successful, so split them off and send them to - // 'success' while we send the others to 'failure'. - final long offset = lastMessageOffset.get(); - if (offset == 0L) { - // all of the messages failed to send. Route FlowFile to failure - getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] { flowFile, pe.getCause() }); - session.transfer(session.penalize(flowFile), REL_FAILURE); - } else { - // Some of the messages were sent successfully. We want to split off the successful messages from the failed messages. - final FlowFile successfulMessages = session.clone(flowFile, 0L, offset); - final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset); - - getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into" - + " two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[] { - messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() }); - - session.transfer(successfulMessages, REL_SUCCESS); - session.transfer(session.penalize(failedMessages), REL_FAILURE); - session.remove(flowFile); - session.getProvenanceReporter().send(successfulMessages, "kafka://" + topic); - } - } finally { - if (error) { - producer.close(); - } else { - returnProducer(producer); - } + messageBatch.setNumMessages(messagesSent.get()); } - } } + + private static class Range { + private final long start; + private final long end; + private final Long kafkaOffset; + + public Range(final long start, final long end, final Long kafkaOffset) { + this.start = start; + this.end = end; + this.kafkaOffset = kafkaOffset; + } + + public long getStart() { + return start; + } + + public long getEnd() { + return end; + } + + public Long getKafkaOffset() { + return kafkaOffset; + } + + @Override + public String toString() { + return "Range[" + start + "-" + end + "]"; + } + } + + private class FlowFileMessageBatch { + private final ProcessSession session; + private final FlowFile flowFile; + private final String topic; + private final long startTime = System.nanoTime(); + + private final List successfulRanges = new ArrayList<>(); + private final List failedRanges = new ArrayList<>(); + + private Exception lastFailureReason; + private long numMessages = -1L; + private long completeTime = 0L; + private boolean canceled = false; + + public FlowFileMessageBatch(final ProcessSession session, final FlowFile flowFile, final String topic) { + this.session = session; + this.flowFile = flowFile; + this.topic = topic; + } + + public synchronized void cancelOrComplete() { + if (isComplete()) { + completeSession(); + return; + } + + this.canceled = true; + + session.rollback(); + successfulRanges.clear(); + failedRanges.clear(); + } + + public synchronized void addSuccessfulRange(final long start, final long end, final long kafkaOffset) { + if (canceled) { + return; + } + + successfulRanges.add(new Range(start, end, kafkaOffset)); + + if (isComplete()) { + activeBatches.remove(this); + completeBatches.add(this); + completeTime = System.nanoTime(); + } + } + + public synchronized void addFailedRange(final long start, final long end, final Exception e) { + if (canceled) { + return; + } + + failedRanges.add(new Range(start, end, null)); + lastFailureReason = e; + + if (isComplete()) { + activeBatches.remove(this); + completeBatches.add(this); + completeTime = System.nanoTime(); + } + } + + private boolean isComplete() { + return !canceled && (numMessages > -1) && (successfulRanges.size() + failedRanges.size() >= numMessages); + } + + public synchronized void setNumMessages(final long msgCount) { + this.numMessages = msgCount; + + if (isComplete()) { + activeBatches.remove(this); + completeBatches.add(this); + completeTime = System.nanoTime(); + } + } + + private Long getMin(final Long a, final Long b) { + if (a == null && b == null) { + return null; + } + + if (a == null) { + return b; + } + + if (b == null) { + return a; + } + + return Math.min(a, b); + } + + private Long getMax(final Long a, final Long b) { + if (a == null && b == null) { + return null; + } + + if (a == null) { + return b; + } + + if (b == null) { + return a; + } + + return Math.max(a, b); + } + + private void transferRanges(final List ranges, final Relationship relationship) { + Collections.sort(ranges, new Comparator() { + @Override + public int compare(final Range o1, final Range o2) { + return Long.compare(o1.getStart(), o2.getStart()); + } + }); + + for (int i = 0; i < ranges.size(); i++) { + Range range = ranges.get(i); + int count = 1; + Long smallestKafkaOffset = range.getKafkaOffset(); + Long largestKafkaOffset = range.getKafkaOffset(); + + while (i + 1 < ranges.size()) { + // Check if the next range in the List continues where this one left off. + final Range nextRange = ranges.get(i + 1); + + if (nextRange.getStart() == range.getEnd()) { + // We have two ranges in a row that are contiguous; combine them into a single Range. + range = new Range(range.getStart(), nextRange.getEnd(), null); + + smallestKafkaOffset = getMin(smallestKafkaOffset, nextRange.getKafkaOffset()); + largestKafkaOffset = getMax(largestKafkaOffset, nextRange.getKafkaOffset()); + count++; + i++; + } else { + break; + } + } + + // Create a FlowFile for this range. + FlowFile child = session.clone(flowFile, range.getStart(), range.getEnd() - range.getStart()); + if (relationship == REL_SUCCESS) { + session.getProvenanceReporter().send(child, getTransitUri(), "Sent " + count + " messages; Kafka offsets range from " + smallestKafkaOffset + " to " + largestKafkaOffset); + session.transfer(child, relationship); + } else { + session.transfer(session.penalize(child), relationship); + } + } + } + + private String getTransitUri() { + final List partitions = getProducer().partitionsFor(topic); + if (partitions.isEmpty()) { + return "kafka://unknown-host" + "/topics/" + topic; + } + + final PartitionInfo info = partitions.get(0); + final Node leader = info.leader(); + final String host = leader.host(); + final int port = leader.port(); + + return "kafka://" + host + ":" + port + "/topics/" + topic; + } + + public synchronized void completeSession() { + if (canceled) { + return; + } + + if (successfulRanges.isEmpty() && failedRanges.isEmpty()) { + getLogger().info("Completed processing {} but sent 0 FlowFiles to Kafka", new Object[] {flowFile}); + session.transfer(flowFile, REL_SUCCESS); + session.commit(); + return; + } + + if (successfulRanges.isEmpty()) { + getLogger().error("Failed to send {} to Kafka; routing to 'failure'; last failure reason reported was {};", new Object[] {flowFile, lastFailureReason}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + session.commit(); + return; + } + + if (failedRanges.isEmpty()) { + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(completeTime - startTime); + + if (successfulRanges.size() == 1) { + final Long kafkaOffset = successfulRanges.get(0).getKafkaOffset(); + final String msg = "Sent 1 message" + ((kafkaOffset == null) ? "" : ("; Kafka offset = " + kafkaOffset)); + session.getProvenanceReporter().send(flowFile, getTransitUri(), msg); + } else { + long smallestKafkaOffset = successfulRanges.get(0).getKafkaOffset(); + long largestKafkaOffset = successfulRanges.get(0).getKafkaOffset(); + + for (final Range range : successfulRanges) { + smallestKafkaOffset = Math.min(smallestKafkaOffset, range.getKafkaOffset()); + largestKafkaOffset = Math.max(largestKafkaOffset, range.getKafkaOffset()); + } + + session.getProvenanceReporter().send(flowFile, getTransitUri(), + "Sent " + successfulRanges.size() + " messages; Kafka offsets range from " + smallestKafkaOffset + " to " + largestKafkaOffset); + } + + session.transfer(flowFile, REL_SUCCESS); + getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] {successfulRanges.size(), flowFile, transferMillis}); + session.commit(); + return; + } + + // At this point, the successful ranges is not empty and the failed ranges is not empty. This indicates that some messages made their way to Kafka + // successfully and some failed. We will address this by splitting apart the source FlowFile into children and sending the successful messages to 'success' + // and the failed messages to 'failure'. + transferRanges(successfulRanges, REL_SUCCESS); + transferRanges(failedRanges, REL_FAILURE); + session.remove(flowFile); + getLogger().error("Successfully sent {} messages to Kafka but failed to send {} messages; the last error received was {}", + new Object[] {successfulRanges.size(), failedRanges.size(), lastFailureReason}); + session.commit(); + } + } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 750d40691e..17d1cc831a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -22,27 +22,33 @@ import static org.junit.Assert.assertTrue; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; -import kafka.common.FailedToSendMessageException; -import kafka.javaapi.producer.Producer; -import kafka.message.CompressionCodec; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; - -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.processor.ProcessContext; +import org.apache.kafka.clients.producer.BufferExhaustedException; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.junit.Ignore; +import org.junit.Assert; import org.junit.Test; -import scala.collection.Seq; +import kafka.common.FailedToSendMessageException; + public class TestPutKafka { @@ -56,24 +62,19 @@ public class TestPutKafka { runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes()); - runner.run(); + runner.run(2); // we have to run twice because the first iteration will result in data being added to a queue in the processor; the second onTrigger call will transfer FlowFiles. runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - final List messages = proc.getProducer().getMessages(); + final List> messages = ((MockProducer) proc.getProducer()).getMessages(); assertEquals(11, messages.size()); - assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0))); - assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1))); - assertTrue(Arrays.equals("1".getBytes(StandardCharsets.UTF_8), messages.get(2))); - assertTrue(Arrays.equals("2".getBytes(StandardCharsets.UTF_8), messages.get(3))); - assertTrue(Arrays.equals("3".getBytes(StandardCharsets.UTF_8), messages.get(4))); - assertTrue(Arrays.equals("4".getBytes(StandardCharsets.UTF_8), messages.get(5))); - assertTrue(Arrays.equals("5".getBytes(StandardCharsets.UTF_8), messages.get(6))); - assertTrue(Arrays.equals("6".getBytes(StandardCharsets.UTF_8), messages.get(7))); - assertTrue(Arrays.equals("7".getBytes(StandardCharsets.UTF_8), messages.get(8))); - assertTrue(Arrays.equals("8".getBytes(StandardCharsets.UTF_8), messages.get(9))); - assertTrue(Arrays.equals("9".getBytes(StandardCharsets.UTF_8), messages.get(10))); + assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0).value())); + assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1).value())); + + for (int i = 1; i <= 9; i++) { + assertTrue(Arrays.equals(String.valueOf(i).getBytes(StandardCharsets.UTF_8), messages.get(i + 1).value())); + } } @Test @@ -87,7 +88,7 @@ public class TestPutKafka { final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9"; runner.enqueue(text.getBytes()); - runner.run(); + runner.run(2); runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1); final MockFlowFile mff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); @@ -96,7 +97,7 @@ public class TestPutKafka { @Test public void testPartialFailure() { - final TestableProcessor proc = new TestableProcessor(2); + final TestableProcessor proc = new TestableProcessor(2); // fail after sending 2 messages. final TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); @@ -106,7 +107,7 @@ public class TestPutKafka { final byte[] bytes = "1\n2\n3\n4".getBytes(); runner.enqueue(bytes); - runner.run(); + runner.run(2); runner.assertTransferCount(PutKafka.REL_SUCCESS, 1); runner.assertTransferCount(PutKafka.REL_FAILURE, 1); @@ -118,6 +119,39 @@ public class TestPutKafka { failureFF.assertContentEquals("3\n4"); } + @Test + public void testPartialFailureWithSuccessBeforeAndAfter() { + final TestableProcessor proc = new TestableProcessor(2, 4); // fail after sending 2 messages, then stop failing after 4 + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B"); + + final byte[] bytes = "1\n2\n3\n4\n5\n6".getBytes(); + runner.enqueue(bytes); + runner.run(2); + + runner.assertTransferCount(PutKafka.REL_SUCCESS, 2); + runner.assertTransferCount(PutKafka.REL_FAILURE, 1); + + final List success = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS); + for (final MockFlowFile successFF : success) { + if ('1' == successFF.toByteArray()[0]) { + successFF.assertContentEquals("1\n2\n"); + } else if ('5' == successFF.toByteArray()[0]) { + successFF.assertContentEquals("5\n6"); + } else { + Assert.fail("Wrong content for FlowFile; contained " + new String(successFF.toByteArray())); + } + } + + final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); + failureFF.assertContentEquals("3\n4\n"); + } + + @Test public void testWithEmptyMessages() { final TestableProcessor proc = new TestableProcessor(); @@ -129,16 +163,16 @@ public class TestPutKafka { final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); runner.enqueue(bytes); - runner.run(); + runner.run(2); runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - final List msgs = proc.getProducer().getMessages(); + final List> msgs = ((MockProducer) proc.getProducer()).getMessages(); assertEquals(4, msgs.size()); - assertTrue(Arrays.equals("1".getBytes(), msgs.get(0))); - assertTrue(Arrays.equals("2".getBytes(), msgs.get(1))); - assertTrue(Arrays.equals("3".getBytes(), msgs.get(2))); - assertTrue(Arrays.equals("4".getBytes(), msgs.get(3))); + + for (int i = 1; i <= 4; i++) { + assertTrue(Arrays.equals(String.valueOf(i).getBytes(), msgs.get(i - 1).value())); + } } @Test @@ -154,14 +188,14 @@ public class TestPutKafka { final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); runner.enqueue(bytes); - runner.run(); + runner.run(2); final List events = runner.getProvenanceEvents(); assertEquals(1, events.size()); final ProvenanceEventRecord event = events.get(0); assertEquals(ProvenanceEventType.SEND, event.getEventType()); - assertEquals("kafka://topic1", event.getTransitUri()); - assertEquals("Sent 4 messages", event.getDetails()); + assertEquals("kafka://localhost:1111/topics/topic1", event.getTransitUri()); + assertTrue(event.getDetails().startsWith("Sent 4 messages")); } @Test @@ -175,265 +209,270 @@ public class TestPutKafka { final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); runner.enqueue(bytes); - runner.run(); + runner.run(2); final List events = runner.getProvenanceEvents(); assertEquals(1, events.size()); final ProvenanceEventRecord event = events.get(0); assertEquals(ProvenanceEventType.SEND, event.getEventType()); - assertEquals("kafka://topic1", event.getTransitUri()); + assertEquals("kafka://localhost:1111/topics/topic1", event.getTransitUri()); } @Test - @Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...") - public void testKeyValuePut() { - final TestRunner runner = TestRunners.newTestRunner(PutKafka.class); - runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092"); - runner.setProperty(PutKafka.TOPIC, "${kafka.topic}"); - runner.setProperty(PutKafka.KEY, "${kafka.key}"); - runner.setProperty(PutKafka.TIMEOUT, "3 secs"); - runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue()); + public void testRoundRobinAcrossMultipleMessages() { + final TestableProcessor proc = new TestableProcessor(); - keyValuePutExecute(runner); - } + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.ROUND_ROBIN_PARTITIONING); - @Test - @Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...") - public void testKeyValuePutAsync() { - final TestRunner runner = TestRunners.newTestRunner(PutKafka.class); - runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092"); - runner.setProperty(PutKafka.TOPIC, "${kafka.topic}"); - runner.setProperty(PutKafka.KEY, "${kafka.key}"); - runner.setProperty(PutKafka.TIMEOUT, "3 secs"); - runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); - runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue()); - - keyValuePutExecute(runner); - } - - private void keyValuePutExecute(final TestRunner runner) { - final Map attributes = new HashMap<>(); - attributes.put("kafka.topic", "test"); - attributes.put("kafka.key", "key3"); - - final byte[] data = "Hello, World, Again! ;)".getBytes(); - runner.enqueue(data, attributes); - runner.enqueue(data, attributes); - runner.enqueue(data, attributes); - runner.enqueue(data, attributes); + runner.enqueue("hello".getBytes()); + runner.enqueue("there".getBytes()); + runner.enqueue("how are you".getBytes()); + runner.enqueue("today".getBytes()); runner.run(5); runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4); - final List mffs = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS); - final MockFlowFile mff = mffs.get(0); - assertTrue(Arrays.equals(data, mff.toByteArray())); + final List> records = ((MockProducer) proc.getProducer()).getMessages(); + for (int i = 0; i < 3; i++) { + assertEquals(i + 1, records.get(i).partition().intValue()); + } + + assertEquals(1, records.get(3).partition().intValue()); } @Test - public void testProducerConfigDefault() { - - final TestableProcessor processor = new TestableProcessor(); - final TestRunner runner = TestRunners.newTestRunner(processor); + public void testRoundRobinAcrossMultipleMessagesInSameFlowFile() { + final TestableProcessor proc = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.ROUND_ROBIN_PARTITIONING); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - final ProcessContext context = runner.getProcessContext(); - final ProducerConfig config = processor.createConfig(context); + runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes()); - // Check the codec - final CompressionCodec codec = config.compressionCodec(); - assertTrue(codec instanceof kafka.message.NoCompressionCodec$); + runner.run(2); - // Check compressed topics - final Seq compressedTopics = config.compressedTopics(); - assertEquals(0, compressedTopics.size()); + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - // Check the producer type - final String actualProducerType = config.producerType(); - assertEquals(PutKafka.PRODUCER_TYPE.getDefaultValue(), actualProducerType); + final List> records = ((MockProducer) proc.getProducer()).getMessages(); + for (int i = 0; i < 3; i++) { + assertEquals(i + 1, records.get(i).partition().intValue()); + } + assertEquals(1, records.get(3).partition().intValue()); } + @Test - public void testProducerConfigAsyncWithCompression() { - - final TestableProcessor processor = new TestableProcessor(); - final TestRunner runner = TestRunners.newTestRunner(processor); + public void testUserDefinedPartition() { + final TestableProcessor proc = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); - runner.setProperty(PutKafka.COMPRESSION_CODEC, PutKafka.COMPRESSION_CODEC_SNAPPY.getValue()); - runner.setProperty(PutKafka.COMPRESSED_TOPICS, "topic01,topic02,topic03"); + runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING); + runner.setProperty(PutKafka.PARTITION, "${part}"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - final ProcessContext context = runner.getProcessContext(); - final ProducerConfig config = processor.createConfig(context); + final Map attrs = new HashMap<>(); + attrs.put("part", "3"); + runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs); - // Check that the codec is snappy - final CompressionCodec codec = config.compressionCodec(); - assertTrue(codec instanceof kafka.message.SnappyCompressionCodec$); + runner.run(2); - // Check compressed topics - final Seq compressedTopics = config.compressedTopics(); - assertEquals(3, compressedTopics.size()); - assertTrue(compressedTopics.contains("topic01")); - assertTrue(compressedTopics.contains("topic02")); - assertTrue(compressedTopics.contains("topic03")); - - // Check the producer type - final String actualProducerType = config.producerType(); - assertEquals("async", actualProducerType); + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + final List> records = ((MockProducer) proc.getProducer()).getMessages(); + for (int i = 0; i < 4; i++) { + assertEquals(3, records.get(i).partition().intValue()); + } } + + @Test - public void testProducerConfigAsyncQueueThresholds() { - - final TestableProcessor processor = new TestableProcessor(); - final TestRunner runner = TestRunners.newTestRunner(processor); + public void testUserDefinedPartitionWithInvalidValue() { + final TestableProcessor proc = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); - runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX, "7 secs"); - runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "535"); - runner.setProperty(PutKafka.QUEUE_ENQUEUE_TIMEOUT, "200 ms"); + runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING); + runner.setProperty(PutKafka.PARTITION, "${part}"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - final ProcessContext context = runner.getProcessContext(); - final ProducerConfig config = processor.createConfig(context); + final Map attrs = new HashMap<>(); + attrs.put("part", "bogus"); + runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs); - // Check that the queue thresholds were properly translated - assertEquals(7000, config.queueBufferingMaxMs()); - assertEquals(535, config.queueBufferingMaxMessages()); - assertEquals(200, config.queueEnqueueTimeoutMs()); + runner.run(2); - // Check the producer type - final String actualProducerType = config.producerType(); - assertEquals("async", actualProducerType); + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + final List> records = ((MockProducer) proc.getProducer()).getMessages(); + // should all be the same partition, regardless of what partition it is. + final int partition = records.get(0).partition().intValue(); + + for (int i = 0; i < 4; i++) { + assertEquals(partition, records.get(i).partition().intValue()); + } } + @Test - public void testProducerConfigInvalidBatchSize() { - - final TestableProcessor processor = new TestableProcessor(); - final TestRunner runner = TestRunners.newTestRunner(processor); + public void testFullBuffer() { + final TestableProcessor proc = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); - runner.setProperty(PutKafka.BATCH_NUM_MESSAGES, "200"); - runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "100"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); + runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "5 B"); + proc.setMaxQueueSize(10L); // will take 4 bytes for key and 1 byte for value. - runner.assertNotValid(); + runner.enqueue("1\n2\n3\n4\n".getBytes()); + runner.run(2); + runner.assertTransferCount(PutKafka.REL_SUCCESS, 1); + runner.assertTransferCount(PutKafka.REL_FAILURE, 1); + + runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0).assertContentEquals("1\n2\n"); + runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0).assertContentEquals("3\n4\n"); } - @Test - public void testProducerConfigAsyncDefaultEnqueueTimeout() { - - final TestableProcessor processor = new TestableProcessor(); - final TestRunner runner = TestRunners.newTestRunner(processor); - - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); - // Do not set QUEUE_ENQUEUE_TIMEOUT - - final ProcessContext context = runner.getProcessContext(); - final ProducerConfig config = processor.createConfig(context); - - // Check that the enqueue timeout defaults to -1 - assertEquals(-1, config.queueEnqueueTimeoutMs()); - - // Check the producer type - final String actualProducerType = config.producerType(); - assertEquals("async", actualProducerType); - - } + /** + * Used to override the {@link #getProducer()} method so that we can enforce that our MockProducer is used + */ private static class TestableProcessor extends PutKafka { - - private MockProducer producer; - private int failAfter = Integer.MAX_VALUE; + private final MockProducer producer; public TestableProcessor() { + this(null); } - public TestableProcessor(final int failAfter) { - this.failAfter = failAfter; + public TestableProcessor(final Integer failAfter) { + this(failAfter, null); } - @OnScheduled - public void instantiateProducer(final ProcessContext context) { - producer = new MockProducer(createConfig(context)); + public TestableProcessor(final Integer failAfter, final Integer stopFailingAfter) { + producer = new MockProducer(); producer.setFailAfter(failAfter); + producer.setStopFailingAfter(stopFailingAfter); } @Override - protected Producer createProducer(final ProcessContext context) { + protected Producer getProducer() { return producer; } - public MockProducer getProducer() { - return producer; - } - - /** - * Exposed for test verification - */ - @Override - public ProducerConfig createConfig(final ProcessContext context) { - return super.createConfig(context); + public void setMaxQueueSize(final long bytes) { + producer.setMaxQueueSize(bytes); } } - private static class MockProducer extends Producer { + + /** + * We have our own Mock Producer, which is very similar to the Kafka-supplied one. However, with the Kafka-supplied + * Producer, we don't have the ability to tell it to fail after X number of messages; rather, we can only tell it + * to fail on the next message. Since we are sending multiple messages in a single onTrigger call for the Processor, + * this doesn't allow us to test failure conditions adequately. + */ + private static class MockProducer implements Producer { private int sendCount = 0; - private int failAfter = Integer.MAX_VALUE; + private Integer failAfter; + private Integer stopFailingAfter; + private long queueSize = 0L; + private long maxQueueSize = Long.MAX_VALUE; - private final List messages = new ArrayList<>(); + private final List> messages = new ArrayList<>(); - public MockProducer(final ProducerConfig config) { - super(config); + public MockProducer() { } - @Override - public void send(final KeyedMessage message) { - if (++sendCount > failAfter) { - throw new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages")); - } else { - messages.add(message.message()); - } + public void setMaxQueueSize(final long bytes) { + this.maxQueueSize = bytes; } - public List getMessages() { + public List> getMessages() { return messages; } - @Override - public void send(final List> messages) { - for (final KeyedMessage msg : messages) { - send(msg); - } + public void setFailAfter(final Integer successCount) { + failAfter = successCount; } - public void setFailAfter(final int successCount) { - failAfter = successCount; + public void setStopFailingAfter(final Integer stopFailingAfter) { + this.stopFailingAfter = stopFailingAfter; + } + + @Override + public Future send(final ProducerRecord record) { + return send(record, null); + } + + @Override + public Future send(final ProducerRecord record, final Callback callback) { + sendCount++; + + final ByteArraySerializer serializer = new ByteArraySerializer(); + final int keyBytes = serializer.serialize(record.topic(), record.key()).length; + final int valueBytes = serializer.serialize(record.topic(), record.value()).length; + if (maxQueueSize - queueSize < keyBytes + valueBytes) { + throw new BufferExhaustedException("Queue size is " + queueSize + " but serialized message is " + (keyBytes + valueBytes)); + } + + queueSize += keyBytes + valueBytes; + + if (failAfter != null && sendCount > failAfter && ((stopFailingAfter == null) || (sendCount < stopFailingAfter + 1))) { + final Exception e = new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages")); + callback.onCompletion(null, e); + } else { + messages.add(record); + final RecordMetadata meta = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 1 : record.partition()), 0L, 0L); + callback.onCompletion(meta, null); + } + + // we don't actually look at the Future in the processor, so we can just return null + return null; + } + + @Override + public List partitionsFor(String topic) { + final Node leader = new Node(1, "localhost", 1111); + final Node node2 = new Node(2, "localhost-2", 2222); + final Node node3 = new Node(3, "localhost-3", 3333); + + final PartitionInfo partInfo1 = new PartitionInfo(topic, 1, leader, new Node[] {node2, node3}, new Node[0]); + final PartitionInfo partInfo2 = new PartitionInfo(topic, 2, leader, new Node[] {node2, node3}, new Node[0]); + final PartitionInfo partInfo3 = new PartitionInfo(topic, 3, leader, new Node[] {node2, node3}, new Node[0]); + + final List infos = new ArrayList<>(3); + infos.add(partInfo1); + infos.add(partInfo2); + infos.add(partInfo3); + return infos; + } + + @Override + public Map metrics() { + return Collections.emptyMap(); + } + + @Override + public void close() { } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index eed6eb2f11..452df4298b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -168,8 +168,12 @@ public class ExecuteSQL extends AbstractProcessor { session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(outgoing, REL_SUCCESS); } catch (final ProcessException | SQLException e) { - logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[] { selectQuery, incoming, e }); - session.transfer(incoming, REL_FAILURE); + if (incoming == null) { + logger.error("Unable to execute SQL select query {} due to {}. No incoming flow file to route to failure", new Object[] {selectQuery, e}); + } else { + logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[] {selectQuery, incoming, e}); + session.transfer(incoming, REL_FAILURE); + } } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java index b7e7a9cd4d..27f293859e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java @@ -114,6 +114,7 @@ public class PutSyslog extends AbstractSyslogProcessor { "\"yyyy-MM-dd'T'HH:mm:ss.SZ\" or \"yyyy-MM-dd'T'HH:mm:ss.S+hh:mm\", \" or it can be an RFC3164 timestamp " + "with a format of \"MMM d HH:mm:ss\".") .required(true) + .defaultValue("${now():format('MMM d HH:mm:ss')}") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) .build(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestDynamicEnvironment.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestDynamicEnvironment.java new file mode 100644 index 0000000000..3e6cad2fc9 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestDynamicEnvironment.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Map; + +public class TestDynamicEnvironment { + public static void main(String[] args) { + // iterate through current environment and print out all properties starting with NIFI + for (Map.Entry env: System.getenv().entrySet()) { + if (env.getKey().startsWith("NIFI")) { + System.out.println(env.getKey() + "=" + env.getValue()); + } + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestIngestAndUpdate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestIngestAndUpdate.java new file mode 100644 index 0000000000..c9ed9f9631 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestIngestAndUpdate.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +public class TestIngestAndUpdate { + + public static void main(String[] args) throws IOException { + byte[] bytes = new byte[1024]; + System.out.write(System.getProperty("user.dir").getBytes()); + System.out.println(":ModifiedResult"); + int numRead = 0; + while ((numRead = System.in.read(bytes)) != -1) { + System.out.write(bytes, 0, numRead); + } + System.out.flush(); + System.out.close(); + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestSuccess.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestSuccess.java new file mode 100644 index 0000000000..3c74d54896 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestSuccess.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class TestSuccess { + + public static void main(String[] args) { + System.out.println("Test was a success"); + } + +}