NIFI-12731: Ensure state is updated in GetHBase whenever the session is committed

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8346.
This commit is contained in:
Matt Burgess 2024-02-01 19:04:26 -05:00 committed by Pierre Villard
parent c6f1d771e8
commit 2f42b44efa
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 44 additions and 46 deletions

View File

@ -140,7 +140,7 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor
.build(); .build();
private final AtomicReference<ScanResult> lastResult = new AtomicReference<>(); private final AtomicReference<ScanResult> lastResult = new AtomicReference<>();
private volatile List<Column> columns = new ArrayList<>(); private final List<Column> columns = new ArrayList<>();
private volatile String previousTable = null; private volatile String previousTable = null;
@Override @Override
@ -201,11 +201,11 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor
for (final String column : columns) { for (final String column : columns) {
if (column.contains(":")) { if (column.contains(":")) {
final String[] parts = column.split(":"); final String[] parts = column.split(":");
final byte[] cf = parts[0].getBytes(Charset.forName("UTF-8")); final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8);
final byte[] cq = parts[1].getBytes(Charset.forName("UTF-8")); final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8);
this.columns.add(new Column(cf, cq)); this.columns.add(new Column(cf, cq));
} else { } else {
final byte[] cf = column.getBytes(Charset.forName("UTF-8")); final byte[] cf = column.getBytes(StandardCharsets.UTF_8);
this.columns.add(new Column(cf, null)); this.columns.add(new Column(cf, null));
} }
} }
@ -307,11 +307,7 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor
final byte[] cellValue = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset()); final byte[] cellValue = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset());
final String rowHash = new String(rowValue, StandardCharsets.UTF_8); final String rowHash = new String(rowValue, StandardCharsets.UTF_8);
Set<String> cellHashes = cellsMatchingTimestamp.get(rowHash); Set<String> cellHashes = cellsMatchingTimestamp.computeIfAbsent(rowHash, k -> new HashSet<>());
if (cellHashes == null) {
cellHashes = new HashSet<>();
cellsMatchingTimestamp.put(rowHash, cellHashes);
}
cellHashes.add(new String(cellValue, StandardCharsets.UTF_8)); cellHashes.add(new String(cellValue, StandardCharsets.UTF_8));
} }
} }
@ -336,11 +332,23 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor
rowsPulledHolder.set(++rowsPulled); rowsPulledHolder.set(++rowsPulled);
if (++rowsPulled % getBatchSize() == 0) { if (++rowsPulled % getBatchSize() == 0) {
session.commitAsync(); updateStateAndCommit(session, latestTimestampHolder.get(), cellsMatchingTimestamp);
} }
}); });
final ScanResult scanResults = new ScanResult(latestTimestampHolder.get(), cellsMatchingTimestamp); updateStateAndCommit(session, latestTimestampHolder.get(), cellsMatchingTimestamp);
} catch (final IOException e) {
getLogger().error("Failed to receive data from HBase due to {}", e);
session.rollback();
} finally {
// if we failed, we want to yield so that we don't hammer hbase. If we succeed, then we have
// pulled all of the records, so we want to wait a bit before hitting hbase again anyway.
context.yield();
}
}
private void updateStateAndCommit(final ProcessSession session, final long latestTimestamp, final Map<String, Set<String>> cellsMatchingTimestamp) throws IOException {
final ScanResult scanResults = new ScanResult(latestTimestamp, cellsMatchingTimestamp);
final ScanResult latestResult = lastResult.get(); final ScanResult latestResult = lastResult.get();
if (latestResult == null || scanResults.getTimestamp() > latestResult.getTimestamp()) { if (latestResult == null || scanResults.getTimestamp() > latestResult.getTimestamp()) {
@ -370,14 +378,6 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor
session.commitAsync(() -> updateScanResultsIfNewer(scanResult)); session.commitAsync(() -> updateScanResultsIfNewer(scanResult));
} }
} catch (final IOException e) {
getLogger().error("Failed to receive data from HBase due to {}", e);
session.rollback();
} finally {
// if we failed, we want to yield so that we don't hammer hbase. If we succeed, then we have
// pulled all of the records, so we want to wait a bit before hitting hbase again anyway.
context.yield();
}
} }
private void updateScanResultsIfNewer(final ScanResult scanResult) { private void updateScanResultsIfNewer(final ScanResult scanResult) {
@ -495,11 +495,7 @@ public class GetHBase extends AbstractProcessor implements VisibilityFetchSuppor
final String rowIndex = matcher.group(1); final String rowIndex = matcher.group(1);
final String cellIndex = matcher.group(3); final String cellIndex = matcher.group(3);
Set<String> cellHashes = rowIndexToMatchingCellHashes.get(rowIndex); Set<String> cellHashes = rowIndexToMatchingCellHashes.computeIfAbsent(rowIndex, k -> new HashSet<>());
if (cellHashes == null) {
cellHashes = new HashSet<>();
rowIndexToMatchingCellHashes.put(rowIndex, cellHashes);
}
if (cellIndex == null) { if (cellIndex == null) {
// this provides a Row ID. // this provides a Row ID.

View File

@ -16,11 +16,13 @@
*/ */
package org.apache.nifi.hbase.scan; package org.apache.nifi.hbase.scan;
import java.io.IOException;
/** /**
* Handles a single row from an HBase scan. * Handles a single row from an HBase scan.
*/ */
public interface ResultHandler { public interface ResultHandler {
void handle(byte[] row, ResultCell[] resultCells); void handle(byte[] row, ResultCell[] resultCells) throws IOException;
} }