NIFI-6226: Returning -1 for EmptyRecordReader.getMaxEventId() to prevent potential provenance repo corruption

This closes #3443.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Bryan Rosander 2019-04-18 13:43:05 -04:00 committed by Mark Payne
parent 5e2559db42
commit d429470cc1
2 changed files with 44 additions and 1 deletions

View File

@ -69,7 +69,7 @@ public class EmptyRecordReader implements RecordReader {
@Override
public long getMaxEventId() throws IOException {
return 0;
return -1;
}
@Override

View File

@ -18,9 +18,11 @@
package org.apache.nifi.provenance.store;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -42,6 +44,7 @@ import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.provenance.toc.StandardTocWriter;
import org.apache.nifi.provenance.toc.TocUtil;
import org.apache.nifi.provenance.toc.TocWriter;
import org.apache.nifi.provenance.util.DirectoryUtils;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@ -97,6 +100,46 @@ public class TestWriteAheadStorePartition {
}
}
@Test
public void testInitEmptyFile() throws IOException {
final RepositoryConfiguration repoConfig = createConfig(1, "testInitEmptyFile");
repoConfig.setMaxEventFileCount(5);
final String partitionName = repoConfig.getStorageDirectories().keySet().iterator().next();
final File storageDirectory = repoConfig.getStorageDirectories().values().iterator().next();
final RecordWriterFactory recordWriterFactory = (file, idGenerator, compressed, createToc) -> {
final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
return new EventIdFirstSchemaRecordWriter(file, idGenerator, tocWriter, compressed, 32 * 1024, IdentifierLookup.EMPTY);
};
final RecordReaderFactory recordReaderFactory = (file, logs, maxChars) -> RecordReaders.newRecordReader(file, logs, maxChars);
WriteAheadStorePartition partition = new WriteAheadStorePartition(storageDirectory, partitionName, repoConfig, recordWriterFactory,
recordReaderFactory, new LinkedBlockingQueue<>(), new AtomicLong(0L), EventReporter.NO_OP);
for (int i = 0; i < 100; i++) {
partition.addEvents(Collections.singleton(TestUtil.createEvent()));
}
long maxEventId = partition.getMaxEventId();
assertTrue(maxEventId > 0);
partition.close();
final List<File> fileList = Arrays.asList(storageDirectory.listFiles(DirectoryUtils.EVENT_FILE_FILTER));
Collections.sort(fileList, DirectoryUtils.LARGEST_ID_FIRST);
// Create new empty prov file with largest id
assertTrue(new File(storageDirectory, "1" + fileList.get(0).getName()).createNewFile());
partition = new WriteAheadStorePartition(storageDirectory, partitionName, repoConfig, recordWriterFactory,
recordReaderFactory, new LinkedBlockingQueue<>(), new AtomicLong(0L), EventReporter.NO_OP);
partition.initialize();
assertEquals(maxEventId, partition.getMaxEventId());
}
private RepositoryConfiguration createConfig(final int numStorageDirs, final String testName) {
final RepositoryConfiguration config = new RepositoryConfiguration();
final File storageDir = new File("target/storage/" + testName + "/" + UUID.randomUUID().toString());