NIFI-12700: refactored PutKudu to optimize memory handling for AUTO_FLUSH_SYNC flush mode (unbatched flush)

NIFI-12700: made changes based on PR comments. Simplified statements involving determination of whether or not there are flowfile failures/rowErrors. Separated out getting rowErrors from OperationResponses into its own function

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #8322
This commit is contained in:
emiliosetiadarma 2024-01-22 01:28:09 -08:00 committed by Matt Burgess
parent 42bd5243bb
commit 3719fddf84
5 changed files with 388 additions and 67 deletions

View File

@ -22,6 +22,7 @@ import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
@ -35,6 +36,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema;
@ -217,19 +220,33 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
}
}
protected void flushKuduSession(final KuduSession kuduSession, boolean close, final List<RowError> rowErrors) throws KuduException {
final List<OperationResponse> responses = close ? kuduSession.close() : kuduSession.flush();
/**
* Get the pending errors from the active {@link KuduSession}. This will only be applicable if the flushMode is
* {@code SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND}.
* @return a {@link List} of pending {@link RowError}s
*/
protected List<RowError> getPendingRowErrorsFromKuduSession(final KuduSession kuduSession) {
if (kuduSession.getFlushMode() == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
rowErrors.addAll(Arrays.asList(kuduSession.getPendingErrors().getRowErrors()));
return Arrays.asList(kuduSession.getPendingErrors().getRowErrors());
} else {
responses.stream()
.filter(OperationResponse::hasRowError)
.map(OperationResponse::getRowError)
.forEach(rowErrors::add);
return Collections.EMPTY_LIST;
}
}
protected List<RowError> flushKuduSession(final KuduSession kuduSession) throws KuduException {
final List<OperationResponse> responses = kuduSession.flush();
// RowErrors will only be present in the OperationResponses in this case if the flush mode
// selected is MANUAL_FLUSH. It will be empty otherwise.
return getRowErrors(responses);
}
protected List<RowError> closeKuduSession(final KuduSession kuduSession) throws KuduException {
final List<OperationResponse> responses = kuduSession.close();
// RowErrors will only be present in the OperationResponses in this case if the flush mode
// selected is MANUAL_FLUSH, since the underlying implementation of kuduSession.close() returns
// the OperationResponses from a flush() call.
return getRowErrors(responses);
}
@OnStopped
public void shutdown() throws Exception {
@ -410,4 +427,11 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
return String.format("PutKudu[%s]-client-%d", identifier, threadCount.getAndIncrement());
}
}
private List<RowError> getRowErrors(final List<OperationResponse> responses) {
return responses.stream()
.filter(OperationResponse::hasRowError)
.map(OperationResponse::getRowError)
.collect(Collectors.toList());
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kudu;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.RowError;
import org.apache.nifi.flowfile.FlowFile;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class AutoFlushSyncPutKuduResult extends PutKuduResult {
private final Map<FlowFile, List<RowError>> flowFileRowErrorsMap;
public AutoFlushSyncPutKuduResult() {
super();
this.flowFileRowErrorsMap = new HashMap<>();
}
@Override
public void recordOperation(final Operation operation) {
// this should be a no-op because we don't need to record Operation's origins
// for buffered flush when using AUTO_FLUSH_SYNC
return;
}
@Override
public void addError(final RowError rowError) {
final List<RowError> rowErrors = flowFileRowErrorsMap.getOrDefault(flowFile, new ArrayList<>());
rowErrors.add(rowError);
flowFileRowErrorsMap.put(flowFile, rowErrors);
}
@Override
public void addErrors(final List<RowError> rowErrors) {
// This is a no-op because we would never be in a situation where we'd have to add a collection of RowError
// using this Flush Mode. Since we do not keep Operation to FlowFile mapping, it will also be impossible to resolve
// RowErrors to the FlowFile that caused them, hence this method should never be implemented for AUTO_FLUSH_SYNC
return;
}
@Override
public boolean hasRowErrorsOrFailures() {
if (!flowFileFailures.isEmpty()) {
return true;
}
for (final Map.Entry<FlowFile, List<RowError>> entry : flowFileRowErrorsMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
return true;
}
}
return false;
}
@Override
public List<RowError> getRowErrorsForFlowFile(final FlowFile flowFile) {
return flowFileRowErrorsMap.getOrDefault(flowFile, Collections.EMPTY_LIST);
}
}

View File

@ -22,10 +22,8 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -357,53 +355,52 @@ public class PutKudu extends AbstractKuduProcessor {
}
private void processFlowFiles(final ProcessContext context, final ProcessSession session, final List<FlowFile> flowFiles, final KuduClient kuduClient) {
final Map<FlowFile, Integer> processedRecords = new HashMap<>();
final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
final List<RowError> pendingRowErrors = new ArrayList<>();
final KuduSession kuduSession = createKuduSession(kuduClient);
final PutKuduResult putKuduResult = flushMode == SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC
? new AutoFlushSyncPutKuduResult() : new StandardPutKuduResult();
try {
processRecords(flowFiles,
processedRecords,
flowFileFailures,
operationFlowFileMap,
pendingRowErrors,
session,
context,
kuduClient,
kuduSession);
kuduSession,
putKuduResult);
} finally {
try {
flushKuduSession(kuduSession, true, pendingRowErrors);
final List<RowError> rowErrors = closeKuduSession(kuduSession);
if (flushMode == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
putKuduResult.addErrors(getPendingRowErrorsFromKuduSession(kuduSession));
} else {
putKuduResult.addErrors(rowErrors);
}
} catch (final KuduException|RuntimeException e) {
getLogger().error("KuduSession.close() Failed", e);
}
}
if (isRollbackOnFailure() && (!pendingRowErrors.isEmpty() || !flowFileFailures.isEmpty())) {
logFailures(pendingRowErrors, operationFlowFileMap);
putKuduResult.resolveFlowFileToRowErrorAssociations();
if (isRollbackOnFailure() && putKuduResult.hasRowErrorsOrFailures()) {
logFailures(putKuduResult);
session.rollback();
context.yield();
} else {
transferFlowFiles(flowFiles, processedRecords, flowFileFailures, operationFlowFileMap, pendingRowErrors, session);
transferFlowFiles(flowFiles, session, putKuduResult);
}
}
private void processRecords(final List<FlowFile> flowFiles,
final Map<FlowFile, Integer> processedRecords,
final Map<FlowFile, Object> flowFileFailures,
final Map<Operation, FlowFile> operationFlowFileMap,
final List<RowError> pendingRowErrors,
final ProcessSession session,
final ProcessContext context,
final KuduClient kuduClient,
final KuduSession kuduSession) {
final ProcessSession session,
final ProcessContext context,
final KuduClient kuduClient,
final KuduSession kuduSession,
final PutKuduResult putKuduResult) {
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
int bufferedRecords = 0;
OperationType prevOperationType = OperationType.INSERT;
for (FlowFile flowFile : flowFiles) {
putKuduResult.setFlowFile(flowFile);
try (final InputStream in = session.read(flowFile);
final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
@ -472,7 +469,12 @@ public class PutKudu extends AbstractKuduProcessor {
// ignore operations.
if (!supportsInsertIgnoreOp && prevOperationType != operationType
&& (prevOperationType == OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) {
flushKuduSession(kuduSession, false, pendingRowErrors);
final List<RowError> rowErrors = flushKuduSession(kuduSession);
if (flushMode == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
putKuduResult.addErrors(getPendingRowErrorsFromKuduSession(kuduSession));
} else {
putKuduResult.addErrors(rowErrors);
}
kuduSession.setIgnoreAllDuplicateRows(operationType == OperationType.INSERT_IGNORE);
}
prevOperationType = operationType;
@ -481,34 +483,35 @@ public class PutKudu extends AbstractKuduProcessor {
Operation operation = createKuduOperation(operationType, dataRecord, fieldNames, ignoreNull, lowercaseFields, kuduTable);
// We keep track of mappings between Operations and their origins,
// so that we know which FlowFiles should be marked failure after buffered flush.
operationFlowFileMap.put(operation, flowFile);
putKuduResult.recordOperation(operation);
// Flush mutation buffer of KuduSession to avoid "MANUAL_FLUSH is enabled
// but the buffer is too big" error. This can happen when flush mode is
// MANUAL_FLUSH and a FlowFile has more than one records.
if (bufferedRecords == batchSize && flushMode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
bufferedRecords = 0;
flushKuduSession(kuduSession, false, pendingRowErrors);
final List<RowError> rowErrors = flushKuduSession(kuduSession);
putKuduResult.addErrors(rowErrors);
}
// OperationResponse is returned only when flush mode is set to AUTO_FLUSH_SYNC
OperationResponse response = kuduSession.apply(operation);
final OperationResponse response = kuduSession.apply(operation);
if (response != null && response.hasRowError()) {
// Stop processing the records on the first error.
// Note that Kudu does not support rolling back of previous operations.
flowFileFailures.put(flowFile, response.getRowError());
putKuduResult.addFailure(response.getRowError());
break recordReaderLoop;
}
bufferedRecords++;
processedRecords.merge(flowFile, 1, Integer::sum);
putKuduResult.incrementProcessedRecordsForFlowFile();
}
record = recordSet.next();
}
} catch (Exception ex) {
getLogger().error("Failed to push {} to Kudu", flowFile, ex);
flowFileFailures.put(flowFile, ex);
putKuduResult.addFailure(ex);
}
}
}
@ -575,38 +578,28 @@ public class PutKudu extends AbstractKuduProcessor {
}
private void transferFlowFiles(final List<FlowFile> flowFiles,
final Map<FlowFile, Integer> processedRecords,
final Map<FlowFile, Object> flowFileFailures,
final Map<Operation, FlowFile> operationFlowFileMap,
final List<RowError> pendingRowErrors,
final ProcessSession session) {
// Find RowErrors for each FlowFile
final Map<FlowFile, List<RowError>> flowFileRowErrors = pendingRowErrors.stream()
.filter(e -> operationFlowFileMap.get(e.getOperation()) != null)
.collect(
Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation()))
);
final ProcessSession session,
final PutKuduResult putKuduResult) {
long totalCount = 0L;
for (FlowFile flowFile : flowFiles) {
final int count = processedRecords.getOrDefault(flowFile, 0);
final int count = putKuduResult.getProcessedRecordsForFlowFile(flowFile);
totalCount += count;
final List<RowError> rowErrors = flowFileRowErrors.get(flowFile);
final List<RowError> rowErrors = putKuduResult.getRowErrorsForFlowFile(flowFile);
if (rowErrors != null) {
if (rowErrors != null && !rowErrors.isEmpty()) {
rowErrors.forEach(rowError -> getLogger().error("Failed to write due to {}", rowError.toString()));
flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR, Integer.toString(count - rowErrors.size()));
totalCount -= rowErrors.size(); // Don't include error rows in the the counter.
totalCount -= rowErrors.size(); // Don't include error rows in the counter.
session.transfer(flowFile, REL_FAILURE);
} else {
flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(count));
if (flowFileFailures.containsKey(flowFile)) {
getLogger().error("Failed to write due to {}", flowFileFailures.get(flowFile));
session.transfer(flowFile, REL_FAILURE);
} else {
if (putKuduResult.isFlowFileProcessedSuccessfully(flowFile)) {
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, "Successfully added FlowFile to Kudu");
} else {
getLogger().error("Failed to write due to {}", putKuduResult.getFailureForFlowFile(flowFile));
session.transfer(flowFile, REL_FAILURE);
}
}
}
@ -614,15 +607,14 @@ public class PutKudu extends AbstractKuduProcessor {
session.adjustCounter("Records Inserted", totalCount, false);
}
private void logFailures(final List<RowError> pendingRowErrors, final Map<Operation, FlowFile> operationFlowFileMap) {
final Map<FlowFile, List<RowError>> flowFileRowErrors = pendingRowErrors.stream().collect(
Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation())));
private void logFailures(final PutKuduResult putKuduResult) {
final Set<FlowFile> processedFlowFiles = putKuduResult.getProcessedFlowFiles();
for (final FlowFile flowFile : processedFlowFiles) {
final List<RowError> errors = putKuduResult.getRowErrorsForFlowFile(flowFile);
if (!errors.isEmpty()) {
getLogger().error("Could not write {} to Kudu due to: {}", flowFile, errors);
}
for (final Map.Entry<FlowFile, List<RowError>> entry : flowFileRowErrors.entrySet()) {
final FlowFile flowFile = entry.getKey();
final List<RowError> errors = entry.getValue();
getLogger().error("Could not write {} to Kudu due to: {}", flowFile, errors);
}
}

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kudu;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.RowError;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.serialization.record.Record;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public abstract class PutKuduResult {
protected FlowFile flowFile;
protected final Map<FlowFile, Object> flowFileFailures;
private final Set<FlowFile> processedFlowFiles;
private final Map<FlowFile, Integer> processedRecords;
public PutKuduResult() {
this.flowFile = null;
this.flowFileFailures = new HashMap<>();
this.processedFlowFiles = new HashSet<>();
this.processedRecords = new HashMap<>();
}
public void setFlowFile(final FlowFile flowFile) {
this.flowFile = flowFile;
processedFlowFiles.add(flowFile);
}
public Set<FlowFile> getProcessedFlowFiles() {
return this.processedFlowFiles;
}
public int getProcessedRecordsForFlowFile(final FlowFile flowFile) {
return this.processedRecords.getOrDefault(flowFile, 0);
}
/**
* Increments the number of {@link Record}s that has been successfully processed for this {@link FlowFile}
*/
public void incrementProcessedRecordsForFlowFile() {
final int newCount = this.processedRecords.getOrDefault(flowFile, 0) + 1;
this.processedRecords.put(flowFile, newCount);
}
/**
* Records an {@link Operation} being processed for a specific {@link FlowFile}
* @param operation the {@link Operation} to record
*/
public abstract void recordOperation(final Operation operation);
/**
* Records a {@link RowError} for the particular {@link FlowFile} that's being processed
* @param rowError the {@link RowError} to add
*/
public abstract void addError(final RowError rowError);
/**
* Records a {@link List} of {@link RowError}s for the particular {@link FlowFile} that's being processed
* @param rowErrors the {@link List} of {@link RowError}s to add
*/
public void addErrors(final List<RowError> rowErrors) {
for (final RowError rowError : rowErrors) {
addError(rowError);
}
}
/**
* Records a failure (an {@link Exception} or a {@link RowError}) for the particular {@link FlowFile} that's being processed.
* A failure is defined as anything that stops the processing of the records in a {@link FlowFile}
* @param failure the {@link Exception} or {@link RowError} to add
*/
public void addFailure(final Object failure) {
if (flowFileFailures.containsKey(flowFile)) {
throw new IllegalStateException("A failure has already previously occurred while processing FlowFile.");
}
flowFileFailures.put(flowFile, failure);
}
/**
* Resolves the associations between {@link FlowFile} and the {@link RowError}s that occurred
* while processing them. This is only applicable in batch sesssion flushes, namely when
* using the {@code SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND} and
* {@code SessionConfiguration.FlushMode.MANUAL_FLUSH} flush modes. Otherwise, this
* function should be a no-op. This function should only be called once finished with processing
* all {@link FlowFile}s in a batch.
*/
public void resolveFlowFileToRowErrorAssociations() {
return;
}
/**
* Checks whether there was a failure (i.e. either an {@link Exception} or {@link RowError} that happened during processing)
* @return {@code true} if there was a {@link Exception} or a {@link RowError} that happened during processing, {@code false} otherwise
*/
public abstract boolean hasRowErrorsOrFailures();
/**
* Checks whether the {@link FlowFile} was processed successfully (i.e. no {@link Exception}s or
* {@link RowError}s occurred while processing the {@link FlowFile}).
*
* @param flowFile {@link FlowFile} to check
* @return {@code true} if the processing the {@link FlowFile} did not incur any exceptions, {@code false} otherwise
*/
public boolean isFlowFileProcessedSuccessfully(final FlowFile flowFile) {
return !flowFileFailures.containsKey(flowFile);
}
/**
* Returns the failure ({@link Exception} or {@link RowError}) that occurred while processing the {@link FlowFile}
* @param flowFile the {@link FlowFile} to check
* @return the {@link Exception} or {@link RowError} if one occurred while processing the given {@link FlowFile} or {@code null}
*/
public Object getFailureForFlowFile(final FlowFile flowFile) {
return flowFileFailures.get(flowFile);
}
/**
* Retrieves the {@link RowError}s that have occurred when processing a {@link FlowFile}
* @param flowFile the {@link FlowFile} to retrieve the {@link RowError}s of
* @return a {@link List} of {@link RowError}s for the {@link FlowFile} or an {@code Collections.EMPTY_LIST} if no errors
*/
public abstract List<RowError> getRowErrorsForFlowFile(final FlowFile flowFile);
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kudu;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.RowError;
import org.apache.nifi.flowfile.FlowFile;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class StandardPutKuduResult extends PutKuduResult {
private final Map<Operation, FlowFile> operationFlowFileMap;
private final List<RowError> pendingRowErrors;
private final Map<FlowFile, List<RowError>> flowFileRowErrorsMap;
public StandardPutKuduResult() {
super();
this.operationFlowFileMap = new HashMap<>();
this.pendingRowErrors = new ArrayList<>();
this.flowFileRowErrorsMap = new HashMap<>();
}
@Override
public void recordOperation(final Operation operation) {
operationFlowFileMap.put(operation, flowFile);
}
@Override
public void addError(final RowError rowError) {
// When this class is used to store results from processing FlowFiles, the FlushMode
// is set to AUTO_FLUSH_BACKGROUND or MANUAL_FLUSH. In either case, we won't know which
// FlowFile/Record we are currently processing as the RowErrors are obtained from the KuduSession
// post-processing of the FlowFile/Record
this.pendingRowErrors.add(rowError);
}
@Override
public void resolveFlowFileToRowErrorAssociations() {
flowFileRowErrorsMap.putAll(pendingRowErrors.stream()
.filter(e -> operationFlowFileMap.get(e.getOperation()) != null)
.collect(
Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation()))
)
);
pendingRowErrors.clear();
}
@Override
public boolean hasRowErrorsOrFailures() {
if (!flowFileFailures.isEmpty()) {
return true;
}
return flowFileRowErrorsMap.entrySet()
.stream()
.anyMatch(entry -> !entry.getValue().isEmpty());
}
@Override
public List<RowError> getRowErrorsForFlowFile(final FlowFile flowFile) {
return flowFileRowErrorsMap.getOrDefault(flowFile, Collections.EMPTY_LIST);
}
}