diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java index 066b1cace6..b8790417dc 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java @@ -175,18 +175,32 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { /** * Handles the thrown CocuhbaseException accordingly. + * @param context a process context * @param session a process session * @param logger a logger * @param inFile an input FlowFile * @param e the thrown CouchbaseException * @param errMsg a message to be logged */ - protected void handleCouchbaseException(final ProcessSession session, + protected void handleCouchbaseException(final ProcessContext context, final ProcessSession session, final ProcessorLog logger, FlowFile inFile, CouchbaseException e, String errMsg) { logger.error(errMsg, e); if(inFile != null){ ErrorHandlingStrategy strategy = CouchbaseExceptionMappings.getStrategy(e); + switch(strategy.penalty()) { + case Penalize: + if(logger.isDebugEnabled()) logger.debug("Penalized: {}", new Object[]{inFile}); + inFile = session.penalize(inFile); + break; + case Yield: + if(logger.isDebugEnabled()) logger.debug("Yielded context: {}", new Object[]{inFile}); + context.yield(); + break; + case None: + break; + } + switch(strategy.result()) { case ProcessException: throw new ProcessException(errMsg, e); diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java index 75b8f46286..bae35d5c30 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.couchbase; +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.None; import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.Penalize; import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.Yield; import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.Failure; @@ -25,11 +26,32 @@ import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result. public enum ErrorHandlingStrategy { + /** + * Processor setting has to be fixed, in order to NOT call failing processor + * frequently, this it be yielded. + */ ConfigurationError(ProcessException, Yield), - InvalidInput(Failure, Penalize), + /** + * The input FlowFile will be sent to the failure relationship for further + * processing without penalizing. Basically, the FlowFile shouldn't be sent + * this processor again unless the issue has been solved. + */ + InvalidInput(Failure, None), + /** + * Couchbase cluster is in unhealthy state. Retrying maybe successful, + * but it should be yielded for a while. + */ TemporalClusterError(Retry, Yield), + /** + * The FlowFile was not processed successfully due to some temporal error + * related to this specific FlowFile or document. Retrying maybe successful, + * but it should be penalized for a while. + */ TemporalFlowFileError(Retry, Penalize), - Fatal(Failure, Yield); + /** + * The error can't be recovered without DataFlow Manager intervention. + */ + Fatal(Retry, Yield); private final Result result; private final Penalty penalty; @@ -46,7 +68,7 @@ public enum ErrorHandlingStrategy { * Indicating yield or penalize the processing when transfer the input FlowFile. */ public enum Penalty { - Yield, Penalize; + Yield, Penalize, None; } public Result result(){ diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java index 8c15e2999d..4aa96777cc 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java @@ -89,21 +89,17 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor { FlowFile inFile = session.get(); String docId = null; - try { - if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ - docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue(); - } else { - final byte[] content = new byte[(int) inFile.getSize()]; - session.read(inFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, content, true); - } - }); - docId = new String(content, StandardCharsets.UTF_8); - } - } catch (Throwable t) { - throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile); + if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ + docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue(); + } else { + final byte[] content = new byte[(int) inFile.getSize()]; + session.read(inFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, content, true); + } + }); + docId = new String(content, StandardCharsets.UTF_8); } if(StringUtils.isEmpty(docId)){ @@ -163,7 +159,7 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor { } catch (CouchbaseException e){ String errMsg = String.format("Getting docuement %s from Couchbase Server using %s failed due to %s", docId, inFile, e); - handleCouchbaseException(session, logger, inFile, e, errMsg); + handleCouchbaseException(context, session, logger, inFile, e, errMsg); } } diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java index 8f413830e1..2aa803c0eb 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java @@ -99,6 +99,7 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor { @Override protected void addSupportedRelationships(Set relationships) { relationships.add(REL_SUCCESS); + relationships.add(REL_RETRY); relationships.add(REL_FAILURE); } @@ -110,7 +111,6 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor { return; } - String docId = null; final byte[] content = new byte[(int) flowFile.getSize()]; session.read(flowFile, new InputStreamCallback() { @Override @@ -119,13 +119,9 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor { } }); - try { - docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key())); - if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ - docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue(); - } - } catch (Throwable t) { - throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + flowFile); + String docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key())); + if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ + docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue(); } try { @@ -158,7 +154,7 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor { } catch (CouchbaseException e) { String errMsg = String.format("Writing docuement %s to Couchbase Server using %s failed due to %s", docId, flowFile, e); - handleCouchbaseException(session, logger, flowFile, e, errMsg); + handleCouchbaseException(context, session, logger, flowFile, e, errMsg); } } diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java index dca2ae31e4..108980c35a 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java @@ -34,6 +34,7 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException; import org.apache.nifi.couchbase.CouchbaseAttributes; import org.apache.nifi.couchbase.CouchbaseClusterControllerService; import org.apache.nifi.processor.exception.ProcessException; @@ -181,9 +182,9 @@ public class TestGetCouchbaseKey { try { testRunner.run(); - fail("ProcessException should be throws."); + fail("Exception should be thrown."); } catch (AssertionError e){ - Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); + Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class)); } testRunner.assertTransferCount(REL_SUCCESS, 0); @@ -208,9 +209,9 @@ public class TestGetCouchbaseKey { testRunner.enqueue(inFileData, properties); try { testRunner.run(); - fail("ProcessException should be throws."); + fail("Exception should be thrown."); } catch (AssertionError e){ - Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); + Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class)); } testRunner.assertTransferCount(REL_SUCCESS, 0); @@ -286,7 +287,7 @@ public class TestGetCouchbaseKey { testRunner.enqueue(inFileData); try { testRunner.run(); - fail("ProcessException should be throws."); + fail("ProcessException should be thrown."); } catch (AssertionError e){ Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); } @@ -313,7 +314,7 @@ public class TestGetCouchbaseKey { testRunner.enqueue(inFileData); try { testRunner.run(); - fail("ProcessException should be throws."); + fail("ProcessException should be thrown."); } catch (AssertionError e){ Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); Assert.assertTrue(e.getCause().getCause().getClass().equals(AuthenticationException.class)); @@ -424,9 +425,9 @@ public class TestGetCouchbaseKey { testRunner.assertTransferCount(REL_SUCCESS, 0); testRunner.assertTransferCount(REL_ORIGINAL, 0); - testRunner.assertTransferCount(REL_RETRY, 0); - testRunner.assertTransferCount(REL_FAILURE, 1); - MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0); + testRunner.assertTransferCount(REL_RETRY, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_RETRY).get(0); orgFile.assertContentEquals(inputFileDataStr); orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName()); } diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java index 0388e35b2f..f8705936ff 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java @@ -37,6 +37,7 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException; import org.apache.nifi.couchbase.CouchbaseAttributes; import org.apache.nifi.couchbase.CouchbaseClusterControllerService; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -167,7 +168,10 @@ public class TestPutCouchbaseKey { testRunner.enqueue(inFileDataBytes, properties); testRunner.run(); - verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)); + ArgumentCaptor capture = ArgumentCaptor.forClass(RawJsonDocument.class); + verify(bucket, times(1)).upsert(capture.capture(), eq(PersistTo.NONE), eq(ReplicateTo.NONE)); + assertEquals(somePropertyValue, capture.getValue().id()); + assertEquals(inFileData, capture.getValue().content()); testRunner.assertTransferCount(REL_SUCCESS, 1); testRunner.assertTransferCount(REL_RETRY, 0); @@ -196,9 +200,9 @@ public class TestPutCouchbaseKey { testRunner.enqueue(inFileDataBytes, properties); try { testRunner.run(); - fail("ProcessException should be throws."); + fail("Exception should be thrown."); } catch (AssertionError e){ - Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); + Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class)); } testRunner.assertTransferCount(REL_SUCCESS, 0); @@ -226,6 +230,7 @@ public class TestPutCouchbaseKey { ArgumentCaptor capture = ArgumentCaptor.forClass(RawJsonDocument.class); verify(bucket, times(1)).upsert(capture.capture(), eq(PersistTo.NONE), eq(ReplicateTo.NONE)); assertEquals(uuid, capture.getValue().id()); + assertEquals(inFileData, capture.getValue().content()); testRunner.assertTransferCount(REL_SUCCESS, 1); testRunner.assertTransferCount(REL_RETRY, 0); @@ -253,7 +258,7 @@ public class TestPutCouchbaseKey { testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString()); try { testRunner.run(); - fail("ProcessException should be throws."); + fail("ProcessException should be thrown."); } catch (AssertionError e){ Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); }