NIFI-7956: This closes #4626. Added option of rolling back session on error instead of routing to failure for PutKudu

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2020-10-27 21:18:36 -04:00 committed by Joe Witt
parent c610aab3cb
commit 2232e28052
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
1 changed files with 57 additions and 1 deletions

View File

@ -34,6 +34,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.PropertyValue;
@ -89,6 +90,12 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGIST
public class PutKudu extends AbstractKuduProcessor {
static final AllowableValue FAILURE_STRATEGY_ROUTE = new AllowableValue("route-to-failure", "Route to Failure",
"The FlowFile containing the Records that failed to insert will be routed to the 'failure' relationship");
static final AllowableValue FAILURE_STRATEGY_ROLLBACK = new AllowableValue("rollback", "Rollback Session",
"If any Record cannot be inserted, all FlowFiles in the session will be rolled back to their input queue. This means that if data cannot be pushed, " +
"it will block any subsequent data from be pushed to Kudu as well until the issue is resolved. However, this may be advantageous if a strict ordering is required.");
protected static final PropertyDescriptor TABLE_NAME = new Builder()
.name("Table Name")
.description("The name of the Kudu Table to put data into")
@ -106,6 +113,15 @@ public class PutKudu extends AbstractKuduProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor FAILURE_STRATEGY = new Builder()
.name("Failure Strategy")
.displayName("Failure Strategy")
.description("If one or more Records in a batch cannot be transferred to Kudu, specifies how to handle the failure")
.required(true)
.allowableValues(FAILURE_STRATEGY_ROUTE, FAILURE_STRATEGY_ROLLBACK)
.defaultValue(FAILURE_STRATEGY_ROUTE.getValue())
.build();
protected static final PropertyDescriptor SKIP_HEAD_LINE = new Builder()
.name("Skip head line")
.description("Deprecated. Used to ignore header lines, but this should be handled by a RecordReader " +
@ -255,12 +271,14 @@ public class PutKudu extends AbstractKuduProcessor {
private volatile SessionConfiguration.FlushMode flushMode;
private volatile Function<Record, OperationType> recordPathOperationType;
private volatile RecordPath dataRecordPath;
private volatile String failureStrategy;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(KUDU_MASTERS);
properties.add(TABLE_NAME);
properties.add(FAILURE_STRATEGY);
properties.add(KERBEROS_CREDENTIALS_SERVICE);
properties.add(KERBEROS_PRINCIPAL);
properties.add(KERBEROS_PASSWORD);
@ -305,6 +323,12 @@ public class PutKudu extends AbstractKuduProcessor {
final String dataRecordPathValue = context.getProperty(DATA_RECORD_PATH).getValue();
dataRecordPath = dataRecordPathValue == null ? null : RecordPath.compile(dataRecordPathValue);
failureStrategy = context.getProperty(FAILURE_STRATEGY).getValue();
}
private boolean isRollbackOnFailure() {
return FAILURE_STRATEGY_ROLLBACK.getValue().equalsIgnoreCase(failureStrategy);
}
@Override
@ -462,10 +486,20 @@ public class PutKudu extends AbstractKuduProcessor {
record = recordSet.next();
}
} catch (Exception ex) {
getLogger().error("Failed to push {} to Kudu", new Object[] {flowFile}, ex);
flowFileFailures.put(flowFile, ex);
}
}
// If configured to rollback on failure, and there's at least one error, rollback the session and return.
if (isRollbackOnFailure() && (!pendingRowErrors.isEmpty() || !flowFileFailures.isEmpty())) {
logFailures(pendingRowErrors, operationFlowFileMap);
session.rollback();
context.yield();
return;
}
// If any data is buffered, flush it.
if (numBuffered > 0) {
try {
flushKuduSession(kuduSession, true, pendingRowErrors);
@ -479,6 +513,15 @@ public class PutKudu extends AbstractKuduProcessor {
}
}
// It's possible that there were no row errors when this was checked above, but flushing the Kudu session may have introduced
// one or more Row Errors. So we need to check again.
if (isRollbackOnFailure() && !pendingRowErrors.isEmpty()) {
logFailures(pendingRowErrors, operationFlowFileMap);
session.rollback();
context.yield();
return;
}
// Find RowErrors for each FlowFile
final Map<FlowFile, List<RowError>> flowFileRowErrors = pendingRowErrors.stream().collect(
Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation())));
@ -490,8 +533,9 @@ public class PutKudu extends AbstractKuduProcessor {
final List<RowError> rowErrors = flowFileRowErrors.get(flowFile);
if (rowErrors != null) {
rowErrors.forEach(rowError -> getLogger().error("Failed to write due to {}", new Object[]{rowError.toString()}));
rowErrors.forEach(rowError -> getLogger().error("Failed to write due to {}", new Object[] {rowError.toString()}));
session.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(count - rowErrors.size()));
totalCount -= rowErrors.size(); // Don't include error rows in the the counter.
session.transfer(flowFile, REL_FAILURE);
} else {
session.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(count));
@ -509,6 +553,18 @@ public class PutKudu extends AbstractKuduProcessor {
session.adjustCounter("Records Inserted", totalCount, false);
}
private void logFailures(final List<RowError> pendingRowErrors, final Map<Operation, FlowFile> operationFlowFileMap) {
final Map<FlowFile, List<RowError>> flowFileRowErrors = pendingRowErrors.stream().collect(
Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation())));
for (final Map.Entry<FlowFile, List<RowError>> entry : flowFileRowErrors.entrySet()) {
final FlowFile flowFile = entry.getKey();
final List<RowError> errors = entry.getValue();
getLogger().error("Could not write {} to Kudu due to: {}", new Object[] {flowFile, errors});
}
}
private String getEvaluatedProperty(PropertyDescriptor property, ProcessContext context, FlowFile flowFile) {
PropertyValue evaluatedProperty = context.getProperty(property).evaluateAttributeExpressions(flowFile);
if (property.isRequired() && evaluatedProperty == null) {