NIFI-12843: Fix incorrect read of parquet data, when record.count is inherited

This closes #8452.

Signed-off-by: Tamas Palfy <tpalfy@apache.org>
This commit is contained in:
Rajmund Takacs 2024-02-26 16:52:59 +01:00 committed by tpalfy
parent 40d9750bb3
commit f119c49c4d
2 changed files with 6 additions and 93 deletions

View File

@ -61,10 +61,13 @@ public class ParquetRecordReader implements RecordReader {
final Long offset = Optional.ofNullable(variables.get(ParquetAttribute.RECORD_OFFSET))
.map(Long::parseLong)
.orElse(null);
final String recordCount = variables.get(ParquetAttribute.RECORD_COUNT);
recordsToRead = Optional.ofNullable(variables.get(ParquetAttribute.RECORD_COUNT))
.map(Long::parseLong)
.orElse(null);
if (offset != null && recordCount != null) {
recordsToRead = Long.parseLong(recordCount);
} else {
recordsToRead = null;
}
final long fileStartOffset = Optional.ofNullable(variables.get(ParquetAttribute.FILE_RANGE_START_OFFSET))
.map(Long::parseLong)

View File

@ -17,7 +17,6 @@
package org.apache.nifi.parquet;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -79,30 +78,6 @@ public class TestParquetReader {
.forEach(i -> assertEquals(ParquetTestUtils.createUser(i), convertRecordToUser(results.get(i))));
}
@Test
public void testReadUsersPartiallyWithLimitedRecordCount() throws IOException, MalformedRecordException {
final int numUsers = 25;
final int expectedRecords = 3;
final File parquetFile = ParquetTestUtils.createUsersParquetFile(numUsers);
final List<Record> results = getRecords(parquetFile, singletonMap(ParquetAttribute.RECORD_COUNT, "3"));
assertEquals(expectedRecords, results.size());
IntStream.range(0, expectedRecords)
.forEach(i -> assertEquals(ParquetTestUtils.createUser(i), convertRecordToUser(results.get(i))));
}
@Test
public void testReadUsersPartiallyWithOffset() throws IOException, MalformedRecordException {
final int numUsers = 1000025; // intentionally so large, to test input with many record groups
final int expectedRecords = 5;
final File parquetFile = ParquetTestUtils.createUsersParquetFile(numUsers);
final List<Record> results = getRecords(parquetFile, singletonMap(ParquetAttribute.RECORD_OFFSET, "1000020"));
assertEquals(expectedRecords, results.size());
IntStream.range(0, expectedRecords)
.forEach(i -> assertEquals(ParquetTestUtils.createUser(i + 1000020), convertRecordToUser(results.get(i))));
}
@Test
public void testReadUsersPartiallyWithOffsetAndLimitedRecordCount() throws IOException, MalformedRecordException {
final int numUsers = 1000025; // intentionally so large, to test input with many record groups
@ -120,28 +95,6 @@ public class TestParquetReader {
.forEach(i -> assertEquals(ParquetTestUtils.createUser(i + 1000020), convertRecordToUser(results.get(i))));
}
@Test
public void testReadUsersPartiallyWithLimitedRecordCountWithinFileRange()
throws IOException, MalformedRecordException {
final int numUsers = 1000;
final int expectedRecords = 3;
final File parquetFile = ParquetTestUtils.createUsersParquetFile(numUsers);
final List<Record> results = getRecords(
parquetFile,
new HashMap<String, String>() {
{
put(ParquetAttribute.RECORD_COUNT, "3");
put(ParquetAttribute.FILE_RANGE_START_OFFSET, "16543");
put(ParquetAttribute.FILE_RANGE_END_OFFSET, "24784");
}
}
);
assertEquals(expectedRecords, results.size());
IntStream.range(0, expectedRecords)
.forEach(i -> assertEquals(ParquetTestUtils.createUser(i + 663), convertRecordToUser(results.get(i))));
}
@Test
public void testReadUsersPartiallyWithOffsetWithinFileRange() throws IOException, MalformedRecordException {
final int numUsers = 1000;
@ -213,25 +166,6 @@ public class TestParquetReader {
"MapRecord[{name=Bob9, favorite_number=9, favorite_color=blue9}]");
}
@Test
public void testPartialReaderWithLimitedRecordCount() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(TestParquetProcessor.class);
final ParquetReader parquetReader = new ParquetReader();
runner.addControllerService("reader", parquetReader);
runner.enableControllerService(parquetReader);
runner.enqueue(Paths.get(PARQUET_PATH), singletonMap(ParquetAttribute.RECORD_COUNT, "2"));
runner.setProperty(TestParquetProcessor.READER, "reader");
runner.run();
runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).get(0).assertContentEquals(
"MapRecord[{name=Bob0, favorite_number=0, favorite_color=blue0}]\n" +
"MapRecord[{name=Bob1, favorite_number=1, favorite_color=blue1}]");
}
@Test
public void testPartialReaderWithOffsetAndLimitedRecordCount() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(TestParquetProcessor.class);
@ -256,30 +190,6 @@ public class TestParquetReader {
"MapRecord[{name=Bob7, favorite_number=7, favorite_color=blue7}]");
}
@Test
public void testPartialReaderWithOffsetOnly() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(TestParquetProcessor.class);
final ParquetReader parquetReader = new ParquetReader();
runner.addControllerService("reader", parquetReader);
runner.enableControllerService(parquetReader);
runner.enqueue(Paths.get(PARQUET_PATH), singletonMap(ParquetAttribute.RECORD_OFFSET, "3"));
runner.setProperty(TestParquetProcessor.READER, "reader");
runner.run();
runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).get(0).assertContentEquals(
"MapRecord[{name=Bob3, favorite_number=3, favorite_color=blue3}]\n" +
"MapRecord[{name=Bob4, favorite_number=4, favorite_color=blue4}]\n" +
"MapRecord[{name=Bob5, favorite_number=5, favorite_color=blue5}]\n" +
"MapRecord[{name=Bob6, favorite_number=6, favorite_color=blue6}]\n" +
"MapRecord[{name=Bob7, favorite_number=7, favorite_color=blue7}]\n" +
"MapRecord[{name=Bob8, favorite_number=8, favorite_color=blue8}]\n" +
"MapRecord[{name=Bob9, favorite_number=9, favorite_color=blue9}]");
}
private List<Record> getRecords(File parquetFile, Map<String, String> variables)
throws IOException, MalformedRecordException {
final List<Record> results = new ArrayList<>();