diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EmptyRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EmptyRecordReader.java index d4487ab2e5..6c717788ca 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EmptyRecordReader.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EmptyRecordReader.java @@ -69,7 +69,7 @@ public class EmptyRecordReader implements RecordReader { @Override public long getMaxEventId() throws IOException { - return 0; + return -1; } @Override diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestWriteAheadStorePartition.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestWriteAheadStorePartition.java index c17bafb1c4..28c8deee86 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestWriteAheadStorePartition.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestWriteAheadStorePartition.java @@ -18,9 +18,11 @@ package org.apache.nifi.provenance.store; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -42,6 +44,7 @@ import org.apache.nifi.provenance.serialization.StorageSummary; import org.apache.nifi.provenance.toc.StandardTocWriter; import org.apache.nifi.provenance.toc.TocUtil; import org.apache.nifi.provenance.toc.TocWriter; +import org.apache.nifi.provenance.util.DirectoryUtils; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -97,6 +100,46 @@ public class TestWriteAheadStorePartition { } } + @Test + public void testInitEmptyFile() throws IOException { + final RepositoryConfiguration repoConfig = createConfig(1, "testInitEmptyFile"); + repoConfig.setMaxEventFileCount(5); + + final String partitionName = repoConfig.getStorageDirectories().keySet().iterator().next(); + final File storageDirectory = repoConfig.getStorageDirectories().values().iterator().next(); + + final RecordWriterFactory recordWriterFactory = (file, idGenerator, compressed, createToc) -> { + final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null; + return new EventIdFirstSchemaRecordWriter(file, idGenerator, tocWriter, compressed, 32 * 1024, IdentifierLookup.EMPTY); + }; + + final RecordReaderFactory recordReaderFactory = (file, logs, maxChars) -> RecordReaders.newRecordReader(file, logs, maxChars); + + WriteAheadStorePartition partition = new WriteAheadStorePartition(storageDirectory, partitionName, repoConfig, recordWriterFactory, + recordReaderFactory, new LinkedBlockingQueue<>(), new AtomicLong(0L), EventReporter.NO_OP); + + for (int i = 0; i < 100; i++) { + partition.addEvents(Collections.singleton(TestUtil.createEvent())); + } + + long maxEventId = partition.getMaxEventId(); + assertTrue(maxEventId > 0); + partition.close(); + + final List fileList = Arrays.asList(storageDirectory.listFiles(DirectoryUtils.EVENT_FILE_FILTER)); + Collections.sort(fileList, DirectoryUtils.LARGEST_ID_FIRST); + + // Create new empty prov file with largest id + assertTrue(new File(storageDirectory, "1" + fileList.get(0).getName()).createNewFile()); + + partition = new WriteAheadStorePartition(storageDirectory, partitionName, repoConfig, recordWriterFactory, + recordReaderFactory, new LinkedBlockingQueue<>(), new AtomicLong(0L), EventReporter.NO_OP); + + partition.initialize(); + + assertEquals(maxEventId, partition.getMaxEventId()); + } + private RepositoryConfiguration createConfig(final int numStorageDirs, final String testName) { final RepositoryConfiguration config = new RepositoryConfiguration(); final File storageDir = new File("target/storage/" + testName + "/" + UUID.randomUUID().toString());