From 2232e280523f47daebf3ce5ff5f0080261d78928 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 27 Oct 2020 21:18:36 -0400 Subject: [PATCH] NIFI-7956: This closes #4626. Added option of rolling back session on error instead of routing to failure for PutKudu Signed-off-by: Joe Witt --- .../apache/nifi/processors/kudu/PutKudu.java | 58 ++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index c811b6b441..1a649b0433 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -34,6 +34,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor.Builder; import org.apache.nifi.components.PropertyValue; @@ -89,6 +90,12 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGIST public class PutKudu extends AbstractKuduProcessor { + static final AllowableValue FAILURE_STRATEGY_ROUTE = new AllowableValue("route-to-failure", "Route to Failure", + "The FlowFile containing the Records that failed to insert will be routed to the 'failure' relationship"); + static final AllowableValue FAILURE_STRATEGY_ROLLBACK = new AllowableValue("rollback", "Rollback Session", + "If any Record cannot be inserted, all FlowFiles in the session will be rolled back to their input queue. This means that if data cannot be pushed, " + + "it will block any subsequent data from be pushed to Kudu as well until the issue is resolved. However, this may be advantageous if a strict ordering is required."); + protected static final PropertyDescriptor TABLE_NAME = new Builder() .name("Table Name") .description("The name of the Kudu Table to put data into") @@ -106,6 +113,15 @@ public class PutKudu extends AbstractKuduProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + static final PropertyDescriptor FAILURE_STRATEGY = new Builder() + .name("Failure Strategy") + .displayName("Failure Strategy") + .description("If one or more Records in a batch cannot be transferred to Kudu, specifies how to handle the failure") + .required(true) + .allowableValues(FAILURE_STRATEGY_ROUTE, FAILURE_STRATEGY_ROLLBACK) + .defaultValue(FAILURE_STRATEGY_ROUTE.getValue()) + .build(); + protected static final PropertyDescriptor SKIP_HEAD_LINE = new Builder() .name("Skip head line") .description("Deprecated. Used to ignore header lines, but this should be handled by a RecordReader " + @@ -255,12 +271,14 @@ public class PutKudu extends AbstractKuduProcessor { private volatile SessionConfiguration.FlushMode flushMode; private volatile Function recordPathOperationType; private volatile RecordPath dataRecordPath; + private volatile String failureStrategy; @Override protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(); properties.add(KUDU_MASTERS); properties.add(TABLE_NAME); + properties.add(FAILURE_STRATEGY); properties.add(KERBEROS_CREDENTIALS_SERVICE); properties.add(KERBEROS_PRINCIPAL); properties.add(KERBEROS_PASSWORD); @@ -305,6 +323,12 @@ public class PutKudu extends AbstractKuduProcessor { final String dataRecordPathValue = context.getProperty(DATA_RECORD_PATH).getValue(); dataRecordPath = dataRecordPathValue == null ? null : RecordPath.compile(dataRecordPathValue); + + failureStrategy = context.getProperty(FAILURE_STRATEGY).getValue(); + } + + private boolean isRollbackOnFailure() { + return FAILURE_STRATEGY_ROLLBACK.getValue().equalsIgnoreCase(failureStrategy); } @Override @@ -462,10 +486,20 @@ public class PutKudu extends AbstractKuduProcessor { record = recordSet.next(); } } catch (Exception ex) { + getLogger().error("Failed to push {} to Kudu", new Object[] {flowFile}, ex); flowFileFailures.put(flowFile, ex); } } + // If configured to rollback on failure, and there's at least one error, rollback the session and return. + if (isRollbackOnFailure() && (!pendingRowErrors.isEmpty() || !flowFileFailures.isEmpty())) { + logFailures(pendingRowErrors, operationFlowFileMap); + session.rollback(); + context.yield(); + return; + } + + // If any data is buffered, flush it. if (numBuffered > 0) { try { flushKuduSession(kuduSession, true, pendingRowErrors); @@ -479,6 +513,15 @@ public class PutKudu extends AbstractKuduProcessor { } } + // It's possible that there were no row errors when this was checked above, but flushing the Kudu session may have introduced + // one or more Row Errors. So we need to check again. + if (isRollbackOnFailure() && !pendingRowErrors.isEmpty()) { + logFailures(pendingRowErrors, operationFlowFileMap); + session.rollback(); + context.yield(); + return; + } + // Find RowErrors for each FlowFile final Map> flowFileRowErrors = pendingRowErrors.stream().collect( Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation()))); @@ -490,8 +533,9 @@ public class PutKudu extends AbstractKuduProcessor { final List rowErrors = flowFileRowErrors.get(flowFile); if (rowErrors != null) { - rowErrors.forEach(rowError -> getLogger().error("Failed to write due to {}", new Object[]{rowError.toString()})); + rowErrors.forEach(rowError -> getLogger().error("Failed to write due to {}", new Object[] {rowError.toString()})); session.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(count - rowErrors.size())); + totalCount -= rowErrors.size(); // Don't include error rows in the the counter. session.transfer(flowFile, REL_FAILURE); } else { session.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(count)); @@ -509,6 +553,18 @@ public class PutKudu extends AbstractKuduProcessor { session.adjustCounter("Records Inserted", totalCount, false); } + private void logFailures(final List pendingRowErrors, final Map operationFlowFileMap) { + final Map> flowFileRowErrors = pendingRowErrors.stream().collect( + Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation()))); + + for (final Map.Entry> entry : flowFileRowErrors.entrySet()) { + final FlowFile flowFile = entry.getKey(); + final List errors = entry.getValue(); + + getLogger().error("Could not write {} to Kudu due to: {}", new Object[] {flowFile, errors}); + } + } + private String getEvaluatedProperty(PropertyDescriptor property, ProcessContext context, FlowFile flowFile) { PropertyValue evaluatedProperty = context.getProperty(property).evaluateAttributeExpressions(flowFile); if (property.isRequired() && evaluatedProperty == null) {