nifi-992 Improvements based on code review part II.

- Penalize or Yield based on ErrorHandlingStrategy.Penalty
- Add Retry relationship to PutCouchbaseKey
- Remove unnecessary try/catch and let the framework handle it
- Change CouchbaseException relation mapping for Fatal from Failure to Retry

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
ijokarumawak 2015-10-01 15:05:29 +09:00 committed by Bryan Bende
parent 72eb64e8a4
commit 033a1553ab
6 changed files with 76 additions and 42 deletions

View File

@ -175,18 +175,32 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
/** /**
* Handles the thrown CocuhbaseException accordingly. * Handles the thrown CocuhbaseException accordingly.
* @param context a process context
* @param session a process session * @param session a process session
* @param logger a logger * @param logger a logger
* @param inFile an input FlowFile * @param inFile an input FlowFile
* @param e the thrown CouchbaseException * @param e the thrown CouchbaseException
* @param errMsg a message to be logged * @param errMsg a message to be logged
*/ */
protected void handleCouchbaseException(final ProcessSession session, protected void handleCouchbaseException(final ProcessContext context, final ProcessSession session,
final ProcessorLog logger, FlowFile inFile, CouchbaseException e, final ProcessorLog logger, FlowFile inFile, CouchbaseException e,
String errMsg) { String errMsg) {
logger.error(errMsg, e); logger.error(errMsg, e);
if(inFile != null){ if(inFile != null){
ErrorHandlingStrategy strategy = CouchbaseExceptionMappings.getStrategy(e); ErrorHandlingStrategy strategy = CouchbaseExceptionMappings.getStrategy(e);
switch(strategy.penalty()) {
case Penalize:
if(logger.isDebugEnabled()) logger.debug("Penalized: {}", new Object[]{inFile});
inFile = session.penalize(inFile);
break;
case Yield:
if(logger.isDebugEnabled()) logger.debug("Yielded context: {}", new Object[]{inFile});
context.yield();
break;
case None:
break;
}
switch(strategy.result()) { switch(strategy.result()) {
case ProcessException: case ProcessException:
throw new ProcessException(errMsg, e); throw new ProcessException(errMsg, e);

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.processors.couchbase; package org.apache.nifi.processors.couchbase;
import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.None;
import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.Penalize; import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.Penalize;
import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.Yield; import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.Yield;
import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.Failure; import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.Failure;
@ -25,11 +26,32 @@ import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.
public enum ErrorHandlingStrategy { public enum ErrorHandlingStrategy {
/**
* Processor setting has to be fixed, in order to NOT call failing processor
* frequently, this it be yielded.
*/
ConfigurationError(ProcessException, Yield), ConfigurationError(ProcessException, Yield),
InvalidInput(Failure, Penalize), /**
* The input FlowFile will be sent to the failure relationship for further
* processing without penalizing. Basically, the FlowFile shouldn't be sent
* this processor again unless the issue has been solved.
*/
InvalidInput(Failure, None),
/**
* Couchbase cluster is in unhealthy state. Retrying maybe successful,
* but it should be yielded for a while.
*/
TemporalClusterError(Retry, Yield), TemporalClusterError(Retry, Yield),
/**
* The FlowFile was not processed successfully due to some temporal error
* related to this specific FlowFile or document. Retrying maybe successful,
* but it should be penalized for a while.
*/
TemporalFlowFileError(Retry, Penalize), TemporalFlowFileError(Retry, Penalize),
Fatal(Failure, Yield); /**
* The error can't be recovered without DataFlow Manager intervention.
*/
Fatal(Retry, Yield);
private final Result result; private final Result result;
private final Penalty penalty; private final Penalty penalty;
@ -46,7 +68,7 @@ public enum ErrorHandlingStrategy {
* Indicating yield or penalize the processing when transfer the input FlowFile. * Indicating yield or penalize the processing when transfer the input FlowFile.
*/ */
public enum Penalty { public enum Penalty {
Yield, Penalize; Yield, Penalize, None;
} }
public Result result(){ public Result result(){

View File

@ -89,21 +89,17 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
FlowFile inFile = session.get(); FlowFile inFile = session.get();
String docId = null; String docId = null;
try { if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue();
docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue(); } else {
} else { final byte[] content = new byte[(int) inFile.getSize()];
final byte[] content = new byte[(int) inFile.getSize()]; session.read(inFile, new InputStreamCallback() {
session.read(inFile, new InputStreamCallback() { @Override
@Override public void process(final InputStream in) throws IOException {
public void process(final InputStream in) throws IOException { StreamUtils.fillBuffer(in, content, true);
StreamUtils.fillBuffer(in, content, true); }
} });
}); docId = new String(content, StandardCharsets.UTF_8);
docId = new String(content, StandardCharsets.UTF_8);
}
} catch (Throwable t) {
throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile);
} }
if(StringUtils.isEmpty(docId)){ if(StringUtils.isEmpty(docId)){
@ -163,7 +159,7 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
} catch (CouchbaseException e){ } catch (CouchbaseException e){
String errMsg = String.format("Getting docuement %s from Couchbase Server using %s failed due to %s", docId, inFile, e); String errMsg = String.format("Getting docuement %s from Couchbase Server using %s failed due to %s", docId, inFile, e);
handleCouchbaseException(session, logger, inFile, e, errMsg); handleCouchbaseException(context, session, logger, inFile, e, errMsg);
} }
} }

View File

@ -99,6 +99,7 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
@Override @Override
protected void addSupportedRelationships(Set<Relationship> relationships) { protected void addSupportedRelationships(Set<Relationship> relationships) {
relationships.add(REL_SUCCESS); relationships.add(REL_SUCCESS);
relationships.add(REL_RETRY);
relationships.add(REL_FAILURE); relationships.add(REL_FAILURE);
} }
@ -110,7 +111,6 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
return; return;
} }
String docId = null;
final byte[] content = new byte[(int) flowFile.getSize()]; final byte[] content = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() { session.read(flowFile, new InputStreamCallback() {
@Override @Override
@ -119,13 +119,9 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
} }
}); });
try { String docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key()));
docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key())); if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
}
} catch (Throwable t) {
throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + flowFile);
} }
try { try {
@ -158,7 +154,7 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
} catch (CouchbaseException e) { } catch (CouchbaseException e) {
String errMsg = String.format("Writing docuement %s to Couchbase Server using %s failed due to %s", docId, flowFile, e); String errMsg = String.format("Writing docuement %s to Couchbase Server using %s failed due to %s", docId, flowFile, e);
handleCouchbaseException(session, logger, flowFile, e, errMsg); handleCouchbaseException(context, session, logger, flowFile, e, errMsg);
} }
} }

View File

@ -34,6 +34,7 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException;
import org.apache.nifi.couchbase.CouchbaseAttributes; import org.apache.nifi.couchbase.CouchbaseAttributes;
import org.apache.nifi.couchbase.CouchbaseClusterControllerService; import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
@ -181,9 +182,9 @@ public class TestGetCouchbaseKey {
try { try {
testRunner.run(); testRunner.run();
fail("ProcessException should be throws."); fail("Exception should be thrown.");
} catch (AssertionError e){ } catch (AssertionError e){
Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class));
} }
testRunner.assertTransferCount(REL_SUCCESS, 0); testRunner.assertTransferCount(REL_SUCCESS, 0);
@ -208,9 +209,9 @@ public class TestGetCouchbaseKey {
testRunner.enqueue(inFileData, properties); testRunner.enqueue(inFileData, properties);
try { try {
testRunner.run(); testRunner.run();
fail("ProcessException should be throws."); fail("Exception should be thrown.");
} catch (AssertionError e){ } catch (AssertionError e){
Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class));
} }
testRunner.assertTransferCount(REL_SUCCESS, 0); testRunner.assertTransferCount(REL_SUCCESS, 0);
@ -286,7 +287,7 @@ public class TestGetCouchbaseKey {
testRunner.enqueue(inFileData); testRunner.enqueue(inFileData);
try { try {
testRunner.run(); testRunner.run();
fail("ProcessException should be throws."); fail("ProcessException should be thrown.");
} catch (AssertionError e){ } catch (AssertionError e){
Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
} }
@ -313,7 +314,7 @@ public class TestGetCouchbaseKey {
testRunner.enqueue(inFileData); testRunner.enqueue(inFileData);
try { try {
testRunner.run(); testRunner.run();
fail("ProcessException should be throws."); fail("ProcessException should be thrown.");
} catch (AssertionError e){ } catch (AssertionError e){
Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
Assert.assertTrue(e.getCause().getCause().getClass().equals(AuthenticationException.class)); Assert.assertTrue(e.getCause().getCause().getClass().equals(AuthenticationException.class));
@ -424,9 +425,9 @@ public class TestGetCouchbaseKey {
testRunner.assertTransferCount(REL_SUCCESS, 0); testRunner.assertTransferCount(REL_SUCCESS, 0);
testRunner.assertTransferCount(REL_ORIGINAL, 0); testRunner.assertTransferCount(REL_ORIGINAL, 0);
testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_RETRY, 1);
testRunner.assertTransferCount(REL_FAILURE, 1); testRunner.assertTransferCount(REL_FAILURE, 0);
MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0); MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_RETRY).get(0);
orgFile.assertContentEquals(inputFileDataStr); orgFile.assertContentEquals(inputFileDataStr);
orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName()); orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName());
} }

View File

@ -37,6 +37,7 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException;
import org.apache.nifi.couchbase.CouchbaseAttributes; import org.apache.nifi.couchbase.CouchbaseAttributes;
import org.apache.nifi.couchbase.CouchbaseClusterControllerService; import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -167,7 +168,10 @@ public class TestPutCouchbaseKey {
testRunner.enqueue(inFileDataBytes, properties); testRunner.enqueue(inFileDataBytes, properties);
testRunner.run(); testRunner.run();
verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)); ArgumentCaptor<RawJsonDocument> capture = ArgumentCaptor.forClass(RawJsonDocument.class);
verify(bucket, times(1)).upsert(capture.capture(), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
assertEquals(somePropertyValue, capture.getValue().id());
assertEquals(inFileData, capture.getValue().content());
testRunner.assertTransferCount(REL_SUCCESS, 1); testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_RETRY, 0);
@ -196,9 +200,9 @@ public class TestPutCouchbaseKey {
testRunner.enqueue(inFileDataBytes, properties); testRunner.enqueue(inFileDataBytes, properties);
try { try {
testRunner.run(); testRunner.run();
fail("ProcessException should be throws."); fail("Exception should be thrown.");
} catch (AssertionError e){ } catch (AssertionError e){
Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); Assert.assertTrue(e.getCause().getClass().equals(AttributeExpressionLanguageException.class));
} }
testRunner.assertTransferCount(REL_SUCCESS, 0); testRunner.assertTransferCount(REL_SUCCESS, 0);
@ -226,6 +230,7 @@ public class TestPutCouchbaseKey {
ArgumentCaptor<RawJsonDocument> capture = ArgumentCaptor.forClass(RawJsonDocument.class); ArgumentCaptor<RawJsonDocument> capture = ArgumentCaptor.forClass(RawJsonDocument.class);
verify(bucket, times(1)).upsert(capture.capture(), eq(PersistTo.NONE), eq(ReplicateTo.NONE)); verify(bucket, times(1)).upsert(capture.capture(), eq(PersistTo.NONE), eq(ReplicateTo.NONE));
assertEquals(uuid, capture.getValue().id()); assertEquals(uuid, capture.getValue().id());
assertEquals(inFileData, capture.getValue().content());
testRunner.assertTransferCount(REL_SUCCESS, 1); testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_RETRY, 0);
@ -253,7 +258,7 @@ public class TestPutCouchbaseKey {
testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString()); testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString());
try { try {
testRunner.run(); testRunner.run();
fail("ProcessException should be throws."); fail("ProcessException should be thrown.");
} catch (AssertionError e){ } catch (AssertionError e){
Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class));
} }