NIFI-5949: Addressed problematic error handling logic in PutKudu. Also removed AbstractKudu and moved all logic into PutKudu, as the abstract class was clearly designed to be extended only by PutKudu and no other processors exist for Kudu

Added explicit .toString() call on rowError to avoid IllegalStateException during logging of row errors

This closes #3262.
This commit is contained in:
Mark Payne 2019-01-10 16:26:13 -05:00 committed by Jeff Storck
parent 4ef2251d74
commit fd3d69bc90
5 changed files with 315 additions and 374 deletions

View File

@ -77,6 +77,7 @@ import org.springframework.util.Assert;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo; import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean; import java.lang.management.ThreadMXBean;
import java.lang.reflect.InvocationTargetException;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -1544,9 +1545,10 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
} finally { } finally {
schedulingAgentCallback.onTaskComplete(); schedulingAgentCallback.onTaskComplete();
} }
} catch (final Exception e) { } catch (Exception e) {
final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
procLog.error("Failed to properly initialize Processor. If still scheduled to run, NiFi will attempt to " procLog.error("Failed to properly initialize Processor. If still scheduled to run, NiFi will attempt to "
+ "initialize and run the Processor again after the 'Administrative Yield Duration' has elapsed. Failure is due to " + e, e); + "initialize and run the Processor again after the 'Administrative Yield Duration' has elapsed. Failure is due to " + cause, cause);
// If processor's task completed Exceptionally, then we want to retry initiating the start (if Processor is still scheduled to run). // If processor's task completed Exceptionally, then we want to retry initiating the start (if Processor is still scheduled to run).
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) { try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), processor.getIdentifier())) {

View File

@ -1,304 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kudu;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.SessionConfiguration.FlushMode;
import org.apache.kudu.client.Upsert;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.serialization.record.Record;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class AbstractKudu extends AbstractProcessor {
protected static final PropertyDescriptor KUDU_MASTERS = new PropertyDescriptor.Builder()
.name("Kudu Masters")
.description("List all kudu masters's ip with port (e.g. 7051), comma separated")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("Table Name")
.description("The name of the Kudu Table to put data into")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("The service for reading records from incoming flow files.")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
protected static final PropertyDescriptor SKIP_HEAD_LINE = new PropertyDescriptor.Builder()
.name("Skip head line")
.description("Deprecated. Used to ignore header lines, but this should be handled by a RecordReader " +
"(e.g. \"Treat First Line as Header\" property of CSVReader)")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor INSERT_OPERATION = new PropertyDescriptor.Builder()
.name("Insert Operation")
.description("Specify operationType for this processor. Insert-Ignore will ignore duplicated rows")
.allowableValues(OperationType.values())
.defaultValue(OperationType.INSERT.toString())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor FLUSH_MODE = new PropertyDescriptor.Builder()
.name("Flush Mode")
.description("Set the new flush mode for a kudu session.\n" +
"AUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.\n" +
"AUTO_FLUSH_BACKGROUND: the call returns when the operation has been added to the buffer. This call should normally perform only fast in-memory" +
" operations but it may have to wait when the buffer is full and there's another buffer being flushed.\n" +
"MANUAL_FLUSH: the call returns when the operation has been added to the buffer, else it throws a KuduException if the buffer is full.")
.allowableValues(SessionConfiguration.FlushMode.values())
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString())
.required(true)
.build();
protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. " +
"Depending on your memory size, and data size per row set an appropriate batch size. " +
"Gradually increase this number to find out the best one for best performances.")
.defaultValue("100")
.required(true)
.addValidator(StandardValidators.createLongValidator(1, 100000, true))
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
protected static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu")
.build();
protected static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if it cannot be sent to Kudu")
.build();
public static final String RECORD_COUNT_ATTR = "record.count";
protected String kuduMasters;
protected String tableName;
protected boolean skipHeadLine;
protected OperationType operationType;
protected SessionConfiguration.FlushMode flushMode;
protected int batchSize = 100;
protected KuduClient kuduClient;
protected KuduTable kuduTable;
@OnScheduled
public void OnScheduled(final ProcessContext context) {
try {
tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
skipHeadLine = context.getProperty(SKIP_HEAD_LINE).asBoolean();
if (kuduClient == null) {
getLogger().debug("Setting up Kudu connection...");
kuduClient = getKuduConnection(kuduMasters);
kuduTable = this.getKuduTable(kuduClient, tableName);
getLogger().debug("Kudu connection successfully initialized");
}
} catch(KuduException ex){
getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), ex);
}
}
@OnStopped
public final void closeClient() throws KuduException {
if (kuduClient != null) {
getLogger().info("Closing KuduClient");
kuduClient.close();
kuduClient = null;
}
}
private Stream<RowError> flushKuduSession(final KuduSession kuduSession, boolean close) throws Exception {
List<OperationResponse> responses = close ? kuduSession.close() : kuduSession.flush();
Stream<RowError> rowErrors;
if (kuduSession.getFlushMode() == FlushMode.AUTO_FLUSH_BACKGROUND) {
rowErrors = Stream.of(kuduSession.getPendingErrors().getRowErrors());
} else {
rowErrors = responses.stream()
.filter(OperationResponse::hasRowError)
.map(OperationResponse::getRowError);
}
return rowErrors;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles.isEmpty()) return;
final KuduSession kuduSession = getKuduSession(kuduClient);
final RecordReaderFactory recordReaderFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final Map<FlowFile, Integer> numRecords = new HashMap<>();
final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
int numBuffered = 0;
Stream<RowError> pendingRowErrors = Stream.empty();
for (FlowFile flowFile : flowFiles) {
try (final InputStream in = session.read(flowFile);
final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
final List<String> fieldNames = recordReader.getSchema().getFieldNames();
final RecordSet recordSet = recordReader.createRecordSet();
// Deprecated
if (skipHeadLine) recordSet.next();
Record record = recordSet.next();
while (record != null) {
Operation operation = operationType == OperationType.UPSERT
? upsertRecordToKudu(kuduTable, record, fieldNames)
: insertRecordToKudu(kuduTable, record, fieldNames);
// We keep track of mappings between Operations and their origins,
// so that we know which FlowFiles should be marked failure after buffered flush.
operationFlowFileMap.put(operation, flowFile);
// Flush mutation buffer of KuduSession to avoid "MANUAL_FLUSH is enabled
// but the buffer is too big" error. This can happen when flush mode is
// MANUAL_FLUSH and a FlowFile has more than one records.
if (numBuffered == batchSize && flushMode == FlushMode.MANUAL_FLUSH) {
numBuffered = 0;
pendingRowErrors = Stream.concat(pendingRowErrors, flushKuduSession(kuduSession, false));
}
// OperationResponse is returned only when flush mode is set to AUTO_FLUSH_SYNC
OperationResponse response = kuduSession.apply(operation);
if (response != null && response.hasRowError()) {
// Stop processing the records on the first error.
// Note that Kudu does not support rolling back of previous operations.
flowFileFailures.put(flowFile, response.getRowError());
break;
}
numBuffered++;
numRecords.merge(flowFile, 1, Integer::sum);
record = recordSet.next();
}
} catch (Exception ex) {
flowFileFailures.put(flowFile, ex);
}
}
try {
// Find RowErrors for each FlowFile
Map<FlowFile, List<RowError>> flowFileRowErrors =
Stream.concat(
pendingRowErrors,
numBuffered > 0 ? flushKuduSession(kuduSession, true) : Stream.empty()
).collect(Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation())));
flowFiles.forEach(ff -> {
int count = numRecords.getOrDefault(ff, 0);
List<RowError> rowErrors = flowFileRowErrors.get(ff);
if (rowErrors != null) {
rowErrors.forEach(rowError ->
getLogger().error("Failed to write due to {}", new Object[]{rowError}));
session.putAttribute(ff, RECORD_COUNT_ATTR, String.valueOf(count - rowErrors.size()));
session.transfer(ff, REL_FAILURE);
} else {
session.putAttribute(ff, RECORD_COUNT_ATTR, String.valueOf(count));
if (flowFileFailures.containsKey(ff)) {
getLogger().error("Failed to write due to {}", new Object[]{flowFileFailures.get(ff)});
session.transfer(ff, REL_FAILURE);
} else {
session.transfer(ff, REL_SUCCESS);
session.getProvenanceReporter().send(ff, "Successfully added FlowFile to Kudu");
}
}
});
} catch (Exception ex) {
throw new ProcessException(ex);
}
}
protected KuduClient getKuduConnection(String masters) {
return new KuduClient.KuduClientBuilder(kuduMasters).build();
}
protected KuduTable getKuduTable(KuduClient client, String tableName) throws KuduException {
return client.openTable(tableName);
}
protected KuduSession getKuduSession(KuduClient client){
KuduSession kuduSession = client.newSession();
kuduSession.setMutationBufferSpace(batchSize);
kuduSession.setFlushMode(flushMode);
if(operationType == OperationType.INSERT_IGNORE){
kuduSession.setIgnoreAllDuplicateRows(true);
}
return kuduSession;
}
protected abstract Insert insertRecordToKudu(final KuduTable table, final Record record, final List<String> fields) throws Exception;
protected abstract Upsert upsertRecordToKudu(final KuduTable table, final Record record, final List<String> fields) throws Exception;
}

View File

@ -17,31 +17,53 @@
package org.apache.nifi.processors.kudu; package org.apache.nifi.processors.kudu;
import com.google.common.annotations.VisibleForTesting;
import org.apache.kudu.ColumnSchema; import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema; import org.apache.kudu.Schema;
import org.apache.kudu.Type; import org.apache.kudu.Type;
import org.apache.kudu.client.Insert; import org.apache.kudu.client.Insert;
import org.apache.kudu.client.Upsert; import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.PartialRow; import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.client.Upsert;
import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;
import com.google.common.annotations.VisibleForTesting; import java.io.InputStream;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
@EventDriven @EventDriven
@SupportsBatching @SupportsBatching
@ -51,7 +73,89 @@ import java.util.Set;
"to the specified Kudu's table. The schema for the table must be provided in the processor properties or from your source." + "to the specified Kudu's table. The schema for the table must be provided in the processor properties or from your source." +
" If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure") " If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure")
@WritesAttribute(attribute = "record.count", description = "Number of records written to Kudu") @WritesAttribute(attribute = "record.count", description = "Number of records written to Kudu")
public class PutKudu extends AbstractKudu { public class PutKudu extends AbstractProcessor {
protected static final PropertyDescriptor KUDU_MASTERS = new PropertyDescriptor.Builder()
.name("Kudu Masters")
.description("List all kudu masters's ip with port (e.g. 7051), comma separated")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("Table Name")
.description("The name of the Kudu Table to put data into")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("The service for reading records from incoming flow files.")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
protected static final PropertyDescriptor SKIP_HEAD_LINE = new PropertyDescriptor.Builder()
.name("Skip head line")
.description("Deprecated. Used to ignore header lines, but this should be handled by a RecordReader " +
"(e.g. \"Treat First Line as Header\" property of CSVReader)")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor INSERT_OPERATION = new PropertyDescriptor.Builder()
.name("Insert Operation")
.description("Specify operationType for this processor. Insert-Ignore will ignore duplicated rows")
.allowableValues(OperationType.values())
.defaultValue(OperationType.INSERT.toString())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor FLUSH_MODE = new PropertyDescriptor.Builder()
.name("Flush Mode")
.description("Set the new flush mode for a kudu session.\n" +
"AUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.\n" +
"AUTO_FLUSH_BACKGROUND: the call returns when the operation has been added to the buffer. This call should normally perform only fast in-memory" +
" operations but it may have to wait when the buffer is full and there's another buffer being flushed.\n" +
"MANUAL_FLUSH: the call returns when the operation has been added to the buffer, else it throws a KuduException if the buffer is full.")
.allowableValues(SessionConfiguration.FlushMode.values())
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString())
.required(true)
.build();
protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. " +
"Depending on your memory size, and data size per row set an appropriate batch size. " +
"Gradually increase this number to find out the best one for best performances.")
.defaultValue("100")
.required(true)
.addValidator(StandardValidators.createLongValidator(1, 100000, true))
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
protected static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu")
.build();
protected static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if it cannot be sent to Kudu")
.build();
public static final String RECORD_COUNT_ATTR = "record.count";
protected OperationType operationType;
protected SessionConfiguration.FlushMode flushMode;
protected int batchSize = 100;
protected KuduClient kuduClient;
protected KuduTable kuduTable;
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -75,14 +179,169 @@ public class PutKudu extends AbstractKudu {
return rels; return rels;
} }
@OnScheduled
public void OnScheduled(final ProcessContext context) throws KuduException {
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
getLogger().debug("Setting up Kudu connection...");
kuduClient = createClient(kuduMasters);
kuduTable = kuduClient.openTable(tableName);
getLogger().debug("Kudu connection successfully initialized");
}
protected KuduClient createClient(final String masters) {
return new KuduClient.KuduClientBuilder(masters).build();
}
@OnStopped
public final void closeClient() throws KuduException {
if (kuduClient != null) {
getLogger().debug("Closing KuduClient");
kuduClient.close();
kuduClient = null;
}
}
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles.isEmpty()) {
return;
}
final KuduSession kuduSession = getKuduSession(kuduClient);
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final Map<FlowFile, Integer> numRecords = new HashMap<>();
final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
int numBuffered = 0;
final List<RowError> pendingRowErrors = new ArrayList<>();
for (FlowFile flowFile : flowFiles) {
try (final InputStream in = session.read(flowFile);
final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
final List<String> fieldNames = recordReader.getSchema().getFieldNames();
final RecordSet recordSet = recordReader.createRecordSet();
Record record = recordSet.next();
while (record != null) {
Operation operation = operationType == OperationType.UPSERT
? upsertRecordToKudu(kuduTable, record, fieldNames)
: insertRecordToKudu(kuduTable, record, fieldNames);
// We keep track of mappings between Operations and their origins,
// so that we know which FlowFiles should be marked failure after buffered flush.
operationFlowFileMap.put(operation, flowFile);
// Flush mutation buffer of KuduSession to avoid "MANUAL_FLUSH is enabled
// but the buffer is too big" error. This can happen when flush mode is
// MANUAL_FLUSH and a FlowFile has more than one records.
if (numBuffered == batchSize && flushMode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
numBuffered = 0;
flushKuduSession(kuduSession, false, pendingRowErrors);
}
// OperationResponse is returned only when flush mode is set to AUTO_FLUSH_SYNC
OperationResponse response = kuduSession.apply(operation);
if (response != null && response.hasRowError()) {
// Stop processing the records on the first error.
// Note that Kudu does not support rolling back of previous operations.
flowFileFailures.put(flowFile, response.getRowError());
break;
}
numBuffered++;
numRecords.merge(flowFile, 1, Integer::sum);
record = recordSet.next();
}
} catch (Exception ex) {
flowFileFailures.put(flowFile, ex);
}
}
if (numBuffered > 0) {
try {
flushKuduSession(kuduSession, true, pendingRowErrors);
} catch (final Exception e) {
getLogger().error("Failed to flush/close Kudu Session", e);
for (final FlowFile flowFile : flowFiles) {
session.transfer(flowFile, REL_FAILURE);
}
return;
}
}
// Find RowErrors for each FlowFile
final Map<FlowFile, List<RowError>> flowFileRowErrors = pendingRowErrors.stream().collect(
Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation())));
long totalCount = 0L;
for (final FlowFile flowFile : flowFiles) {
final int count = numRecords.getOrDefault(flowFile, 0);
totalCount += count;
final List<RowError> rowErrors = flowFileRowErrors.get(flowFile);
if (rowErrors != null) {
rowErrors.forEach(rowError -> getLogger().error("Failed to write due to {}", new Object[]{rowError.toString()}));
session.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(count - rowErrors.size()));
session.transfer(flowFile, REL_FAILURE);
} else {
session.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(count));
if (flowFileFailures.containsKey(flowFile)) {
getLogger().error("Failed to write due to {}", new Object[]{flowFileFailures.get(flowFile)});
session.transfer(flowFile, REL_FAILURE);
} else {
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, "Successfully added FlowFile to Kudu");
}
}
}
session.adjustCounter("Records Inserted", totalCount, false);
}
protected KuduSession getKuduSession(final KuduClient client) {
final KuduSession kuduSession = client.newSession();
kuduSession.setMutationBufferSpace(batchSize);
kuduSession.setFlushMode(flushMode);
if (operationType == OperationType.INSERT_IGNORE) {
kuduSession.setIgnoreAllDuplicateRows(true);
}
return kuduSession;
}
private void flushKuduSession(final KuduSession kuduSession, boolean close, final List<RowError> rowErrors) throws KuduException {
final List<OperationResponse> responses = close ? kuduSession.close() : kuduSession.flush();
if (kuduSession.getFlushMode() == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
rowErrors.addAll(Arrays.asList(kuduSession.getPendingErrors().getRowErrors()));
} else {
responses.stream()
.filter(OperationResponse::hasRowError)
.map(OperationResponse::getRowError)
.forEach(rowErrors::add);
}
}
protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception { protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception {
Upsert upsert = kuduTable.newUpsert(); Upsert upsert = kuduTable.newUpsert();
this.buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames); this.buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames);
return upsert; return upsert;
} }
@Override
protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception { protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception {
Insert insert = kuduTable.newInsert(); Insert insert = kuduTable.newInsert();
this.buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, fieldNames); this.buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, fieldNames);

View File

@ -17,20 +17,20 @@
package org.apache.nifi.processors.kudu; package org.apache.nifi.processors.kudu;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduSession; import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.Upsert; import org.apache.kudu.client.Upsert;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MockPutKudu extends PutKudu { public class MockPutKudu extends PutKudu {
private KuduSession session; private KuduSession session;
@ -61,17 +61,20 @@ public class MockPutKudu extends PutKudu {
} }
@Override @Override
protected KuduClient getKuduConnection(String masters) { protected KuduClient createClient(final String masters) {
return mock(KuduClient.class); final KuduClient client = mock(KuduClient.class);
try {
when(client.openTable(anyString())).thenReturn(mock(KuduTable.class));
} catch (final Exception e) {
}
return client;
} }
@Override @Override
protected KuduSession getKuduSession(KuduClient client){ protected KuduSession getKuduSession(KuduClient client) {
return session; return session;
} }
@Override
protected KuduTable getKuduTable(KuduClient client, String tableName) throws KuduException {
return mock(KuduTable.class);
}
} }

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.kudu; package org.apache.nifi.processors.kudu;
import java.util.stream.Collectors;
import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder; import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
import org.apache.kudu.ColumnTypeAttributes; import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema; import org.apache.kudu.Schema;
@ -41,14 +40,13 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.Tuple; import org.apache.nifi.util.Tuple;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -59,23 +57,23 @@ import org.mockito.stubbing.OngoingStubbing;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Arrays;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION;
public class TestPutKudu { public class TestPutKudu {
public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table"; public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table";
@ -246,23 +244,6 @@ public class TestPutKudu {
testRunner.assertAllFlowFilesTransferred(PutKudu.REL_FAILURE, 1); testRunner.assertAllFlowFilesTransferred(PutKudu.REL_FAILURE, 1);
} }
@Test
public void testSkipHeadLineTrue() throws InitializationException, IOException {
createRecordReader(100);
testRunner.setProperty(PutKudu.SKIP_HEAD_LINE, "true");
final String filename = "testSkipHeadLineTrue-" + System.currentTimeMillis();
final Map<String,String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
testRunner.enqueue("trigger", flowFileAttributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1);
MockFlowFile flowFiles = testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
flowFiles.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "99");
}
@Test @Test
public void testInsertManyFlowFiles() throws Exception { public void testInsertManyFlowFiles() throws Exception {