From 2f42b44efa2f29a0380b5087e249d7975118a737 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Thu, 1 Feb 2024 19:04:26 -0500 Subject: [PATCH] NIFI-12731: Ensure state is updated in GetHBase whenever the session is committed Signed-off-by: Pierre Villard This closes #8346. --- .../java/org/apache/nifi/hbase/GetHBase.java | 86 +++++++++---------- .../apache/nifi/hbase/scan/ResultHandler.java | 4 +- 2 files changed, 44 insertions(+), 46 deletions(-) diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java index 8b2cf0d973..9e77167acc 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java @@ -140,7 +140,7 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor .build(); private final AtomicReference lastResult = new AtomicReference<>(); - private volatile List columns = new ArrayList<>(); + private final List columns = new ArrayList<>(); private volatile String previousTable = null; @Override @@ -201,11 +201,11 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor for (final String column : columns) { if (column.contains(":")) { final String[] parts = column.split(":"); - final byte[] cf = parts[0].getBytes(Charset.forName("UTF-8")); - final byte[] cq = parts[1].getBytes(Charset.forName("UTF-8")); + final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8); + final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8); this.columns.add(new Column(cf, cq)); } else { - final byte[] cf = column.getBytes(Charset.forName("UTF-8")); + final byte[] cf = column.getBytes(StandardCharsets.UTF_8); this.columns.add(new Column(cf, null)); } } @@ -307,11 +307,7 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor final byte[] cellValue = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset()); final String rowHash = new String(rowValue, StandardCharsets.UTF_8); - Set cellHashes = cellsMatchingTimestamp.get(rowHash); - if (cellHashes == null) { - cellHashes = new HashSet<>(); - cellsMatchingTimestamp.put(rowHash, cellHashes); - } + Set cellHashes = cellsMatchingTimestamp.computeIfAbsent(rowHash, k -> new HashSet<>()); cellHashes.add(new String(cellValue, StandardCharsets.UTF_8)); } } @@ -336,40 +332,11 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor rowsPulledHolder.set(++rowsPulled); if (++rowsPulled % getBatchSize() == 0) { - session.commitAsync(); + updateStateAndCommit(session, latestTimestampHolder.get(), cellsMatchingTimestamp); } }); - final ScanResult scanResults = new ScanResult(latestTimestampHolder.get(), cellsMatchingTimestamp); - - final ScanResult latestResult = lastResult.get(); - if (latestResult == null || scanResults.getTimestamp() > latestResult.getTimestamp()) { - session.setState(scanResults.toFlatMap(), Scope.CLUSTER); - session.commitAsync(() -> updateScanResultsIfNewer(scanResults)); - } else if (scanResults.getTimestamp() == latestResult.getTimestamp()) { - final Map> combinedResults = new HashMap<>(scanResults.getMatchingCells()); - - // copy the results of result.getMatchingCells() to combinedResults. - // do a deep copy because the Set may be modified below. - for (final Map.Entry> entry : scanResults.getMatchingCells().entrySet()) { - combinedResults.put(entry.getKey(), new HashSet<>(entry.getValue())); - } - - // combined the results from 'lastResult' - for (final Map.Entry> entry : latestResult.getMatchingCells().entrySet()) { - final Set existing = combinedResults.get(entry.getKey()); - if (existing == null) { - combinedResults.put(entry.getKey(), new HashSet<>(entry.getValue())); - } else { - existing.addAll(entry.getValue()); - } - } - - final ScanResult scanResult = new ScanResult(scanResults.getTimestamp(), combinedResults); - session.setState(scanResult.toFlatMap(), Scope.CLUSTER); - - session.commitAsync(() -> updateScanResultsIfNewer(scanResult)); - } + updateStateAndCommit(session, latestTimestampHolder.get(), cellsMatchingTimestamp); } catch (final IOException e) { getLogger().error("Failed to receive data from HBase due to {}", e); session.rollback(); @@ -380,6 +347,39 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor } } + private void updateStateAndCommit(final ProcessSession session, final long latestTimestamp, final Map> cellsMatchingTimestamp) throws IOException { + final ScanResult scanResults = new ScanResult(latestTimestamp, cellsMatchingTimestamp); + + final ScanResult latestResult = lastResult.get(); + if (latestResult == null || scanResults.getTimestamp() > latestResult.getTimestamp()) { + session.setState(scanResults.toFlatMap(), Scope.CLUSTER); + session.commitAsync(() -> updateScanResultsIfNewer(scanResults)); + } else if (scanResults.getTimestamp() == latestResult.getTimestamp()) { + final Map> combinedResults = new HashMap<>(scanResults.getMatchingCells()); + + // copy the results of result.getMatchingCells() to combinedResults. + // do a deep copy because the Set may be modified below. + for (final Map.Entry> entry : scanResults.getMatchingCells().entrySet()) { + combinedResults.put(entry.getKey(), new HashSet<>(entry.getValue())); + } + + // combined the results from 'lastResult' + for (final Map.Entry> entry : latestResult.getMatchingCells().entrySet()) { + final Set existing = combinedResults.get(entry.getKey()); + if (existing == null) { + combinedResults.put(entry.getKey(), new HashSet<>(entry.getValue())); + } else { + existing.addAll(entry.getValue()); + } + } + + final ScanResult scanResult = new ScanResult(scanResults.getTimestamp(), combinedResults); + session.setState(scanResult.toFlatMap(), Scope.CLUSTER); + + session.commitAsync(() -> updateScanResultsIfNewer(scanResult)); + } + } + private void updateScanResultsIfNewer(final ScanResult scanResult) { lastResult.getAndUpdate(current -> (current == null || scanResult.getTimestamp() > current.getTimestamp()) ? scanResult : current); } @@ -495,11 +495,7 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor final String rowIndex = matcher.group(1); final String cellIndex = matcher.group(3); - Set cellHashes = rowIndexToMatchingCellHashes.get(rowIndex); - if (cellHashes == null) { - cellHashes = new HashSet<>(); - rowIndexToMatchingCellHashes.put(rowIndex, cellHashes); - } + Set cellHashes = rowIndexToMatchingCellHashes.computeIfAbsent(rowIndex, k -> new HashSet<>()); if (cellIndex == null) { // this provides a Row ID. diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/scan/ResultHandler.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/scan/ResultHandler.java index d0f1eab9ca..9d5e744d12 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/scan/ResultHandler.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/scan/ResultHandler.java @@ -16,11 +16,13 @@ */ package org.apache.nifi.hbase.scan; +import java.io.IOException; + /** * Handles a single row from an HBase scan. */ public interface ResultHandler { - void handle(byte[] row, ResultCell[] resultCells); + void handle(byte[] row, ResultCell[] resultCells) throws IOException; }