diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java index 297f084c51..bb43ba8d32 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java @@ -73,6 +73,9 @@ public class ByteArraySchemaRecordReader extends CompressableRecordReader { final InputStream limitedIn = new LimitingInputStream(in, recordLength); final Record eventRecord = recordReader.readRecord(limitedIn); + if (eventRecord == null) { + return null; + } return EventRecord.getEvent(eventRecord, getFilename(), byteOffset, getMaxAttributeLength()); } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java index b9bb85ecd8..aed690b4d6 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestSchemaRecordReaderWriter.java @@ -20,6 +20,7 @@ package org.apache.nifi.provenance; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.ByteArrayOutputStream; @@ -164,6 +165,31 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter } } + @Test + public void testAddOneRecordReadTwice() throws IOException { + final RecordField unitTestField = new SimpleRecordField("Unit Test Field", FieldType.STRING, Repetition.EXACTLY_ONE); + final Consumer> schemaModifier = fields -> fields.add(unitTestField); + + final Map toAdd = new HashMap<>(); + toAdd.put(unitTestField, "hello"); + + try (final ByteArraySchemaRecordWriter writer = createSchemaWriter(schemaModifier, toAdd)) { + writer.writeHeader(1L); + writer.writeRecord(createEvent(), 3L); + } + + try (final InputStream in = new FileInputStream(journalFile); + final TocReader tocReader = new StandardTocReader(tocFile); + final RecordReader reader = createReader(in, journalFile.getName(), tocReader, 10000)) { + + final ProvenanceEventRecord firstEvent = reader.nextRecord(); + assertNotNull(firstEvent); + + final ProvenanceEventRecord secondEvent = reader.nextRecord(); + assertNull(secondEvent); + } + } + /** * Creates a SchemaRecordWriter that uses a modified schema