From 3719fddf84ffcf18ed29d87ee931aa1acc1699d3 Mon Sep 17 00:00:00 2001 From: emiliosetiadarma Date: Mon, 22 Jan 2024 01:28:09 -0800 Subject: [PATCH] NIFI-12700: refactored PutKudu to optimize memory handling for AUTO_FLUSH_SYNC flush mode (unbatched flush) NIFI-12700: made changes based on PR comments. Simplified statements involving determination of whether or not there are flowfile failures/rowErrors. Separated out getting rowErrors from OperationResponses into its own function Signed-off-by: Matt Burgess This closes #8322 --- .../kudu/AbstractKuduProcessor.java | 40 ++++- .../kudu/AutoFlushSyncPutKuduResult.java | 78 ++++++++++ .../apache/nifi/processors/kudu/PutKudu.java | 110 +++++++------ .../nifi/processors/kudu/PutKuduResult.java | 144 ++++++++++++++++++ .../kudu/StandardPutKuduResult.java | 83 ++++++++++ 5 files changed, 388 insertions(+), 67 deletions(-) create mode 100644 nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AutoFlushSyncPutKuduResult.java create mode 100644 nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKuduResult.java create mode 100644 nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/StandardPutKuduResult.java diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java index 47c1a34903..b44f2330ee 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java @@ -22,6 +22,7 @@ import java.sql.Date; import java.sql.Timestamp; import java.time.LocalDate; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.Executor; @@ -35,6 +36,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; +import java.util.stream.Collectors; + import org.apache.kudu.ColumnSchema; import org.apache.kudu.ColumnTypeAttributes; import org.apache.kudu.Schema; @@ -217,19 +220,33 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { } } - protected void flushKuduSession(final KuduSession kuduSession, boolean close, final List rowErrors) throws KuduException { - final List responses = close ? kuduSession.close() : kuduSession.flush(); - + /** + * Get the pending errors from the active {@link KuduSession}. This will only be applicable if the flushMode is + * {@code SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND}. + * @return a {@link List} of pending {@link RowError}s + */ + protected List getPendingRowErrorsFromKuduSession(final KuduSession kuduSession) { if (kuduSession.getFlushMode() == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) { - rowErrors.addAll(Arrays.asList(kuduSession.getPendingErrors().getRowErrors())); + return Arrays.asList(kuduSession.getPendingErrors().getRowErrors()); } else { - responses.stream() - .filter(OperationResponse::hasRowError) - .map(OperationResponse::getRowError) - .forEach(rowErrors::add); + return Collections.EMPTY_LIST; } } + protected List flushKuduSession(final KuduSession kuduSession) throws KuduException { + final List responses = kuduSession.flush(); + // RowErrors will only be present in the OperationResponses in this case if the flush mode + // selected is MANUAL_FLUSH. It will be empty otherwise. + return getRowErrors(responses); + } + + protected List closeKuduSession(final KuduSession kuduSession) throws KuduException { + final List responses = kuduSession.close(); + // RowErrors will only be present in the OperationResponses in this case if the flush mode + // selected is MANUAL_FLUSH, since the underlying implementation of kuduSession.close() returns + // the OperationResponses from a flush() call. + return getRowErrors(responses); + } @OnStopped public void shutdown() throws Exception { @@ -410,4 +427,11 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor { return String.format("PutKudu[%s]-client-%d", identifier, threadCount.getAndIncrement()); } } + + private List getRowErrors(final List responses) { + return responses.stream() + .filter(OperationResponse::hasRowError) + .map(OperationResponse::getRowError) + .collect(Collectors.toList()); + } } diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AutoFlushSyncPutKuduResult.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AutoFlushSyncPutKuduResult.java new file mode 100644 index 0000000000..d502741688 --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AutoFlushSyncPutKuduResult.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kudu; + +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.RowError; +import org.apache.nifi.flowfile.FlowFile; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class AutoFlushSyncPutKuduResult extends PutKuduResult { + private final Map> flowFileRowErrorsMap; + + public AutoFlushSyncPutKuduResult() { + super(); + this.flowFileRowErrorsMap = new HashMap<>(); + } + + @Override + public void recordOperation(final Operation operation) { + // this should be a no-op because we don't need to record Operation's origins + // for buffered flush when using AUTO_FLUSH_SYNC + return; + } + + @Override + public void addError(final RowError rowError) { + final List rowErrors = flowFileRowErrorsMap.getOrDefault(flowFile, new ArrayList<>()); + rowErrors.add(rowError); + flowFileRowErrorsMap.put(flowFile, rowErrors); + } + + @Override + public void addErrors(final List rowErrors) { + // This is a no-op because we would never be in a situation where we'd have to add a collection of RowError + // using this Flush Mode. Since we do not keep Operation to FlowFile mapping, it will also be impossible to resolve + // RowErrors to the FlowFile that caused them, hence this method should never be implemented for AUTO_FLUSH_SYNC + return; + } + + @Override + public boolean hasRowErrorsOrFailures() { + if (!flowFileFailures.isEmpty()) { + return true; + } + + for (final Map.Entry> entry : flowFileRowErrorsMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + return true; + } + } + + return false; + } + + @Override + public List getRowErrorsForFlowFile(final FlowFile flowFile) { + return flowFileRowErrorsMap.getOrDefault(flowFile, Collections.EMPTY_LIST); + } +} diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index eaa07617c2..a1317b3fd8 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -22,10 +22,8 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -357,53 +355,52 @@ public class PutKudu extends AbstractKuduProcessor { } private void processFlowFiles(final ProcessContext context, final ProcessSession session, final List flowFiles, final KuduClient kuduClient) { - final Map processedRecords = new HashMap<>(); - final Map flowFileFailures = new HashMap<>(); - final Map operationFlowFileMap = new HashMap<>(); - final List pendingRowErrors = new ArrayList<>(); - final KuduSession kuduSession = createKuduSession(kuduClient); + final PutKuduResult putKuduResult = flushMode == SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC + ? new AutoFlushSyncPutKuduResult() : new StandardPutKuduResult(); try { processRecords(flowFiles, - processedRecords, - flowFileFailures, - operationFlowFileMap, - pendingRowErrors, session, context, kuduClient, - kuduSession); + kuduSession, + putKuduResult); } finally { try { - flushKuduSession(kuduSession, true, pendingRowErrors); + final List rowErrors = closeKuduSession(kuduSession); + if (flushMode == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) { + putKuduResult.addErrors(getPendingRowErrorsFromKuduSession(kuduSession)); + } else { + putKuduResult.addErrors(rowErrors); + } } catch (final KuduException|RuntimeException e) { getLogger().error("KuduSession.close() Failed", e); } } - if (isRollbackOnFailure() && (!pendingRowErrors.isEmpty() || !flowFileFailures.isEmpty())) { - logFailures(pendingRowErrors, operationFlowFileMap); + putKuduResult.resolveFlowFileToRowErrorAssociations(); + + if (isRollbackOnFailure() && putKuduResult.hasRowErrorsOrFailures()) { + logFailures(putKuduResult); session.rollback(); context.yield(); } else { - transferFlowFiles(flowFiles, processedRecords, flowFileFailures, operationFlowFileMap, pendingRowErrors, session); + transferFlowFiles(flowFiles, session, putKuduResult); } } private void processRecords(final List flowFiles, - final Map processedRecords, - final Map flowFileFailures, - final Map operationFlowFileMap, - final List pendingRowErrors, - final ProcessSession session, - final ProcessContext context, - final KuduClient kuduClient, - final KuduSession kuduSession) { + final ProcessSession session, + final ProcessContext context, + final KuduClient kuduClient, + final KuduSession kuduSession, + final PutKuduResult putKuduResult) { final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); int bufferedRecords = 0; OperationType prevOperationType = OperationType.INSERT; for (FlowFile flowFile : flowFiles) { + putKuduResult.setFlowFile(flowFile); try (final InputStream in = session.read(flowFile); final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) { @@ -472,7 +469,12 @@ public class PutKudu extends AbstractKuduProcessor { // ignore operations. if (!supportsInsertIgnoreOp && prevOperationType != operationType && (prevOperationType == OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) { - flushKuduSession(kuduSession, false, pendingRowErrors); + final List rowErrors = flushKuduSession(kuduSession); + if (flushMode == SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) { + putKuduResult.addErrors(getPendingRowErrorsFromKuduSession(kuduSession)); + } else { + putKuduResult.addErrors(rowErrors); + } kuduSession.setIgnoreAllDuplicateRows(operationType == OperationType.INSERT_IGNORE); } prevOperationType = operationType; @@ -481,34 +483,35 @@ public class PutKudu extends AbstractKuduProcessor { Operation operation = createKuduOperation(operationType, dataRecord, fieldNames, ignoreNull, lowercaseFields, kuduTable); // We keep track of mappings between Operations and their origins, // so that we know which FlowFiles should be marked failure after buffered flush. - operationFlowFileMap.put(operation, flowFile); + putKuduResult.recordOperation(operation); // Flush mutation buffer of KuduSession to avoid "MANUAL_FLUSH is enabled // but the buffer is too big" error. This can happen when flush mode is // MANUAL_FLUSH and a FlowFile has more than one records. if (bufferedRecords == batchSize && flushMode == SessionConfiguration.FlushMode.MANUAL_FLUSH) { bufferedRecords = 0; - flushKuduSession(kuduSession, false, pendingRowErrors); + final List rowErrors = flushKuduSession(kuduSession); + putKuduResult.addErrors(rowErrors); } // OperationResponse is returned only when flush mode is set to AUTO_FLUSH_SYNC - OperationResponse response = kuduSession.apply(operation); + final OperationResponse response = kuduSession.apply(operation); if (response != null && response.hasRowError()) { // Stop processing the records on the first error. // Note that Kudu does not support rolling back of previous operations. - flowFileFailures.put(flowFile, response.getRowError()); + putKuduResult.addFailure(response.getRowError()); break recordReaderLoop; } bufferedRecords++; - processedRecords.merge(flowFile, 1, Integer::sum); + putKuduResult.incrementProcessedRecordsForFlowFile(); } record = recordSet.next(); } } catch (Exception ex) { getLogger().error("Failed to push {} to Kudu", flowFile, ex); - flowFileFailures.put(flowFile, ex); + putKuduResult.addFailure(ex); } } } @@ -575,38 +578,28 @@ public class PutKudu extends AbstractKuduProcessor { } private void transferFlowFiles(final List flowFiles, - final Map processedRecords, - final Map flowFileFailures, - final Map operationFlowFileMap, - final List pendingRowErrors, - final ProcessSession session) { - // Find RowErrors for each FlowFile - final Map> flowFileRowErrors = pendingRowErrors.stream() - .filter(e -> operationFlowFileMap.get(e.getOperation()) != null) - .collect( - Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation())) - ); - + final ProcessSession session, + final PutKuduResult putKuduResult) { long totalCount = 0L; for (FlowFile flowFile : flowFiles) { - final int count = processedRecords.getOrDefault(flowFile, 0); + final int count = putKuduResult.getProcessedRecordsForFlowFile(flowFile); totalCount += count; - final List rowErrors = flowFileRowErrors.get(flowFile); + final List rowErrors = putKuduResult.getRowErrorsForFlowFile(flowFile); - if (rowErrors != null) { + if (rowErrors != null && !rowErrors.isEmpty()) { rowErrors.forEach(rowError -> getLogger().error("Failed to write due to {}", rowError.toString())); flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR, Integer.toString(count - rowErrors.size())); - totalCount -= rowErrors.size(); // Don't include error rows in the the counter. + totalCount -= rowErrors.size(); // Don't include error rows in the counter. session.transfer(flowFile, REL_FAILURE); } else { flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR, String.valueOf(count)); - if (flowFileFailures.containsKey(flowFile)) { - getLogger().error("Failed to write due to {}", flowFileFailures.get(flowFile)); - session.transfer(flowFile, REL_FAILURE); - } else { + if (putKuduResult.isFlowFileProcessedSuccessfully(flowFile)) { session.transfer(flowFile, REL_SUCCESS); session.getProvenanceReporter().send(flowFile, "Successfully added FlowFile to Kudu"); + } else { + getLogger().error("Failed to write due to {}", putKuduResult.getFailureForFlowFile(flowFile)); + session.transfer(flowFile, REL_FAILURE); } } } @@ -614,15 +607,14 @@ public class PutKudu extends AbstractKuduProcessor { session.adjustCounter("Records Inserted", totalCount, false); } - private void logFailures(final List pendingRowErrors, final Map operationFlowFileMap) { - final Map> flowFileRowErrors = pendingRowErrors.stream().collect( - Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation()))); + private void logFailures(final PutKuduResult putKuduResult) { + final Set processedFlowFiles = putKuduResult.getProcessedFlowFiles(); + for (final FlowFile flowFile : processedFlowFiles) { + final List errors = putKuduResult.getRowErrorsForFlowFile(flowFile); + if (!errors.isEmpty()) { + getLogger().error("Could not write {} to Kudu due to: {}", flowFile, errors); + } - for (final Map.Entry> entry : flowFileRowErrors.entrySet()) { - final FlowFile flowFile = entry.getKey(); - final List errors = entry.getValue(); - - getLogger().error("Could not write {} to Kudu due to: {}", flowFile, errors); } } diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKuduResult.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKuduResult.java new file mode 100644 index 0000000000..f46a65d90d --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKuduResult.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kudu; + +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.RowError; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.serialization.record.Record; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public abstract class PutKuduResult { + protected FlowFile flowFile; + protected final Map flowFileFailures; + private final Set processedFlowFiles; + private final Map processedRecords; + + public PutKuduResult() { + this.flowFile = null; + + this.flowFileFailures = new HashMap<>(); + this.processedFlowFiles = new HashSet<>(); + this.processedRecords = new HashMap<>(); + } + + public void setFlowFile(final FlowFile flowFile) { + this.flowFile = flowFile; + processedFlowFiles.add(flowFile); + } + + public Set getProcessedFlowFiles() { + return this.processedFlowFiles; + } + + public int getProcessedRecordsForFlowFile(final FlowFile flowFile) { + return this.processedRecords.getOrDefault(flowFile, 0); + } + + /** + * Increments the number of {@link Record}s that has been successfully processed for this {@link FlowFile} + */ + public void incrementProcessedRecordsForFlowFile() { + final int newCount = this.processedRecords.getOrDefault(flowFile, 0) + 1; + this.processedRecords.put(flowFile, newCount); + } + + /** + * Records an {@link Operation} being processed for a specific {@link FlowFile} + * @param operation the {@link Operation} to record + */ + public abstract void recordOperation(final Operation operation); + + /** + * Records a {@link RowError} for the particular {@link FlowFile} that's being processed + * @param rowError the {@link RowError} to add + */ + public abstract void addError(final RowError rowError); + + /** + * Records a {@link List} of {@link RowError}s for the particular {@link FlowFile} that's being processed + * @param rowErrors the {@link List} of {@link RowError}s to add + */ + public void addErrors(final List rowErrors) { + for (final RowError rowError : rowErrors) { + addError(rowError); + } + } + + /** + * Records a failure (an {@link Exception} or a {@link RowError}) for the particular {@link FlowFile} that's being processed. + * A failure is defined as anything that stops the processing of the records in a {@link FlowFile} + * @param failure the {@link Exception} or {@link RowError} to add + */ + public void addFailure(final Object failure) { + if (flowFileFailures.containsKey(flowFile)) { + throw new IllegalStateException("A failure has already previously occurred while processing FlowFile."); + } + flowFileFailures.put(flowFile, failure); + } + + + /** + * Resolves the associations between {@link FlowFile} and the {@link RowError}s that occurred + * while processing them. This is only applicable in batch sesssion flushes, namely when + * using the {@code SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND} and + * {@code SessionConfiguration.FlushMode.MANUAL_FLUSH} flush modes. Otherwise, this + * function should be a no-op. This function should only be called once finished with processing + * all {@link FlowFile}s in a batch. + */ + public void resolveFlowFileToRowErrorAssociations() { + return; + } + + /** + * Checks whether there was a failure (i.e. either an {@link Exception} or {@link RowError} that happened during processing) + * @return {@code true} if there was a {@link Exception} or a {@link RowError} that happened during processing, {@code false} otherwise + */ + public abstract boolean hasRowErrorsOrFailures(); + + /** + * Checks whether the {@link FlowFile} was processed successfully (i.e. no {@link Exception}s or + * {@link RowError}s occurred while processing the {@link FlowFile}). + * + * @param flowFile {@link FlowFile} to check + * @return {@code true} if the processing the {@link FlowFile} did not incur any exceptions, {@code false} otherwise + */ + public boolean isFlowFileProcessedSuccessfully(final FlowFile flowFile) { + return !flowFileFailures.containsKey(flowFile); + } + + /** + * Returns the failure ({@link Exception} or {@link RowError}) that occurred while processing the {@link FlowFile} + * @param flowFile the {@link FlowFile} to check + * @return the {@link Exception} or {@link RowError} if one occurred while processing the given {@link FlowFile} or {@code null} + */ + public Object getFailureForFlowFile(final FlowFile flowFile) { + return flowFileFailures.get(flowFile); + } + + /** + * Retrieves the {@link RowError}s that have occurred when processing a {@link FlowFile} + * @param flowFile the {@link FlowFile} to retrieve the {@link RowError}s of + * @return a {@link List} of {@link RowError}s for the {@link FlowFile} or an {@code Collections.EMPTY_LIST} if no errors + */ + public abstract List getRowErrorsForFlowFile(final FlowFile flowFile); +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/StandardPutKuduResult.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/StandardPutKuduResult.java new file mode 100644 index 0000000000..7b4a61119b --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/StandardPutKuduResult.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kudu; + +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.RowError; +import org.apache.nifi.flowfile.FlowFile; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class StandardPutKuduResult extends PutKuduResult { + private final Map operationFlowFileMap; + private final List pendingRowErrors; + private final Map> flowFileRowErrorsMap; + + public StandardPutKuduResult() { + super(); + this.operationFlowFileMap = new HashMap<>(); + this.pendingRowErrors = new ArrayList<>(); + this.flowFileRowErrorsMap = new HashMap<>(); + } + + @Override + public void recordOperation(final Operation operation) { + operationFlowFileMap.put(operation, flowFile); + } + + @Override + public void addError(final RowError rowError) { + // When this class is used to store results from processing FlowFiles, the FlushMode + // is set to AUTO_FLUSH_BACKGROUND or MANUAL_FLUSH. In either case, we won't know which + // FlowFile/Record we are currently processing as the RowErrors are obtained from the KuduSession + // post-processing of the FlowFile/Record + this.pendingRowErrors.add(rowError); + } + + @Override + public void resolveFlowFileToRowErrorAssociations() { + flowFileRowErrorsMap.putAll(pendingRowErrors.stream() + .filter(e -> operationFlowFileMap.get(e.getOperation()) != null) + .collect( + Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation())) + ) + ); + + pendingRowErrors.clear(); + } + + @Override + public boolean hasRowErrorsOrFailures() { + if (!flowFileFailures.isEmpty()) { + return true; + } + + return flowFileRowErrorsMap.entrySet() + .stream() + .anyMatch(entry -> !entry.getValue().isEmpty()); + } + + @Override + public List getRowErrorsForFlowFile(final FlowFile flowFile) { + return flowFileRowErrorsMap.getOrDefault(flowFile, Collections.EMPTY_LIST); + } +}