mirror of https://github.com/apache/nifi.git
NIFI-9141 Refactored nifi-provenance-provenance-bundle using JUnit 5
NIFI-9141 Added class Timeout annotations This closes #5368 Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
parent
f5a421ccb1
commit
2c5cf8ba0a
|
@ -25,18 +25,11 @@ import org.apache.nifi.provenance.toc.TocReader
|
|||
import org.apache.nifi.provenance.toc.TocUtil
|
||||
import org.apache.nifi.provenance.toc.TocWriter
|
||||
import org.apache.nifi.security.kms.KeyProvider
|
||||
import org.apache.nifi.util.file.FileUtils
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider
|
||||
import org.bouncycastle.util.encoders.Hex
|
||||
import org.junit.After
|
||||
import org.junit.AfterClass
|
||||
import org.junit.Before
|
||||
import org.junit.BeforeClass
|
||||
import org.junit.ClassRule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import org.junit.runner.RunWith
|
||||
import org.junit.runners.JUnit4
|
||||
import org.junit.jupiter.api.BeforeAll
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
|
@ -49,13 +42,11 @@ import java.util.concurrent.atomic.AtomicLong
|
|||
import static groovy.test.GroovyAssert.shouldFail
|
||||
import static org.apache.nifi.provenance.TestUtil.createFlowFile
|
||||
|
||||
@RunWith(JUnit4.class)
|
||||
class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWriter {
|
||||
private static final Logger logger = LoggerFactory.getLogger(EncryptedSchemaRecordReaderWriterTest.class)
|
||||
|
||||
private static final String KEY_HEX_128 = "0123456789ABCDEFFEDCBA9876543210"
|
||||
private static final String KEY_HEX_256 = KEY_HEX_128 * 2
|
||||
private static final String KEY_HEX = isUnlimitedStrengthCryptoAvailable() ? KEY_HEX_256 : KEY_HEX_128
|
||||
private static final String KEY_HEX = KEY_HEX_128
|
||||
private static final String KEY_ID = "K1"
|
||||
|
||||
private static final String TRANSIT_URI = "nifi://unit-test"
|
||||
|
@ -72,25 +63,12 @@ class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWrit
|
|||
private static KeyProvider mockKeyProvider
|
||||
private static ProvenanceEventEncryptor provenanceEventEncryptor = new AESProvenanceEventEncryptor()
|
||||
|
||||
@ClassRule
|
||||
public static TemporaryFolder tempFolder = new TemporaryFolder()
|
||||
|
||||
private static String ORIGINAL_LOG_LEVEL
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
static void setUpOnce() throws Exception {
|
||||
ORIGINAL_LOG_LEVEL = System.getProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance")
|
||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG")
|
||||
|
||||
Security.addProvider(new BouncyCastleProvider())
|
||||
|
||||
logger.metaClass.methodMissing = { String name, args ->
|
||||
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
|
||||
}
|
||||
|
||||
mockKeyProvider = [
|
||||
getKey : { String keyId ->
|
||||
logger.mock("Requesting key ID: ${keyId}")
|
||||
if (keyId == KEY_ID) {
|
||||
new SecretKeySpec(Hex.decode(KEY_HEX), "AES")
|
||||
} else {
|
||||
|
@ -98,48 +76,22 @@ class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWrit
|
|||
}
|
||||
},
|
||||
getAvailableKeyIds: { ->
|
||||
logger.mock("Available key IDs: [${KEY_ID}]")
|
||||
[KEY_ID]
|
||||
},
|
||||
keyExists : { String keyId ->
|
||||
logger.mock("Checking availability of key ID: ${keyId}")
|
||||
keyId == KEY_ID
|
||||
}] as KeyProvider
|
||||
provenanceEventEncryptor.initialize(mockKeyProvider)
|
||||
}
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
void setUp() throws Exception {
|
||||
journalFile = new File("target/storage/${UUID.randomUUID()}/testEventIdFirstSchemaRecordReaderWriter")
|
||||
journalFile = File.createTempFile(getClass().simpleName, ".journal")
|
||||
journalFile.deleteOnExit()
|
||||
tocFile = TocUtil.getTocFile(journalFile)
|
||||
idGenerator.set(0L)
|
||||
}
|
||||
|
||||
@After
|
||||
void tearDown() throws Exception {
|
||||
try {
|
||||
FileUtils.deleteFile(journalFile.getParentFile(), true)
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage())
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
static void tearDownOnce() throws Exception {
|
||||
if (ORIGINAL_LOG_LEVEL) {
|
||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", ORIGINAL_LOG_LEVEL)
|
||||
}
|
||||
try {
|
||||
FileUtils.deleteFile(new File("target/storage"), true)
|
||||
} catch (Exception e) {
|
||||
logger.error(e)
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isUnlimitedStrengthCryptoAvailable() {
|
||||
Cipher.getMaxAllowedKeyLength("AES") > 128
|
||||
}
|
||||
|
||||
private static
|
||||
final FlowFile buildFlowFile(Map attributes = [:], long id = idGenerator.getAndIncrement(), long fileSize = 3000L) {
|
||||
if (!attributes?.uuid) {
|
||||
|
@ -197,7 +149,7 @@ class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWrit
|
|||
logger.info("Generated encrypted writer: ${encryptedWriter}")
|
||||
|
||||
// Act
|
||||
int encryptedRecordId = idGenerator.get()
|
||||
long encryptedRecordId = idGenerator.get()
|
||||
encryptedWriter.writeHeader(encryptedRecordId)
|
||||
encryptedWriter.writeRecords(Collections.singletonList(record))
|
||||
encryptedWriter.close()
|
||||
|
@ -240,13 +192,13 @@ class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWrit
|
|||
logger.info("Generated encrypted writer: ${encryptedWriter}")
|
||||
|
||||
// Act
|
||||
int standardRecordId = idGenerator.get()
|
||||
long standardRecordId = idGenerator.get()
|
||||
standardWriter.writeHeader(standardRecordId)
|
||||
standardWriter.writeRecords(Collections.singletonList(record))
|
||||
standardWriter.close()
|
||||
logger.info("Wrote standard record ${standardRecordId} to journal")
|
||||
|
||||
int encryptedRecordId = idGenerator.get()
|
||||
long encryptedRecordId = idGenerator.get()
|
||||
encryptedWriter.writeHeader(encryptedRecordId)
|
||||
encryptedWriter.writeRecords(Collections.singletonList(record))
|
||||
encryptedWriter.close()
|
||||
|
@ -273,10 +225,6 @@ class EncryptedSchemaRecordReaderWriterTest extends AbstractTestRecordReaderWrit
|
|||
RecordReader incompatibleReader = new EventIdFirstSchemaRecordReader(efis, encryptedJournalFile.getName(), incompatibleTocReader, MAX_ATTRIBUTE_SIZE)
|
||||
logger.info("Generated standard reader (attempting to read encrypted file): ${incompatibleReader}")
|
||||
|
||||
def msg = shouldFail(EOFException) {
|
||||
ProvenanceEventRecord encryptedEvent = incompatibleReader.nextRecord()
|
||||
}
|
||||
logger.expected(msg)
|
||||
assert msg =~ "EOFException: Failed to read field"
|
||||
shouldFail(EOFException) { incompatibleReader.nextRecord() }
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,33 +23,20 @@ import org.apache.nifi.reporting.Severity
|
|||
import org.apache.nifi.security.kms.StaticKeyProvider
|
||||
import org.apache.nifi.util.file.FileUtils
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider
|
||||
import org.junit.After
|
||||
import org.junit.AfterClass
|
||||
import org.junit.Before
|
||||
import org.junit.BeforeClass
|
||||
import org.junit.ClassRule
|
||||
import org.junit.Test
|
||||
import org.junit.Ignore
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import org.junit.runner.RunWith
|
||||
import org.junit.runners.JUnit4
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.junit.jupiter.api.AfterEach
|
||||
import org.junit.jupiter.api.BeforeAll
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
|
||||
import javax.crypto.Cipher
|
||||
import java.security.Security
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
import static org.apache.nifi.provenance.TestUtil.createFlowFile
|
||||
|
||||
@RunWith(JUnit4.class)
|
||||
class EncryptedWriteAheadProvenanceRepositoryTest {
|
||||
private static final Logger logger = LoggerFactory.getLogger(EncryptedWriteAheadProvenanceRepositoryTest.class)
|
||||
|
||||
private static final String KEY_HEX_128 = "0123456789ABCDEFFEDCBA9876543210"
|
||||
private static final String KEY_HEX_256 = KEY_HEX_128 * 2
|
||||
private static final String KEY_HEX = isUnlimitedStrengthCryptoAvailable() ? KEY_HEX_256 : KEY_HEX_128
|
||||
private static final String KEY_HEX = KEY_HEX_128
|
||||
private static final String KEY_ID = "K1"
|
||||
|
||||
private static final String TRANSIT_URI = "nifi://unit-test"
|
||||
|
@ -58,37 +45,24 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
|
|||
|
||||
private static final AtomicLong recordId = new AtomicLong()
|
||||
|
||||
@ClassRule
|
||||
public static TemporaryFolder tempFolder = new TemporaryFolder()
|
||||
|
||||
private ProvenanceRepository repo
|
||||
private static RepositoryConfiguration config
|
||||
|
||||
public static final int DEFAULT_ROLLOVER_MILLIS = 2000
|
||||
private EventReporter eventReporter
|
||||
private List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>())
|
||||
|
||||
private static String ORIGINAL_LOG_LEVEL
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
static void setUpOnce() throws Exception {
|
||||
ORIGINAL_LOG_LEVEL = System.getProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance")
|
||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG")
|
||||
|
||||
Security.addProvider(new BouncyCastleProvider())
|
||||
|
||||
logger.metaClass.methodMissing = { String name, args ->
|
||||
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
void setUp() throws Exception {
|
||||
reportedEvents?.clear()
|
||||
eventReporter = createMockEventReporter()
|
||||
}
|
||||
|
||||
@After
|
||||
@AfterEach
|
||||
void tearDown() throws Exception {
|
||||
closeRepo(repo, config)
|
||||
|
||||
|
@ -97,20 +71,9 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
|
|||
RecordReaders.isEncryptionAvailable = false
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
static void tearDownOnce() throws Exception {
|
||||
if (ORIGINAL_LOG_LEVEL) {
|
||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", ORIGINAL_LOG_LEVEL)
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isUnlimitedStrengthCryptoAvailable() {
|
||||
Cipher.getMaxAllowedKeyLength("AES") > 128
|
||||
}
|
||||
|
||||
private static RepositoryConfiguration createConfiguration() {
|
||||
RepositoryConfiguration config = new RepositoryConfiguration()
|
||||
config.addStorageDirectory("1", new File("target/storage/" + UUID.randomUUID().toString()))
|
||||
config.addStorageDirectory("1", File.createTempDir(getClass().simpleName))
|
||||
config.setCompressOnRollover(true)
|
||||
config.setMaxEventFileLife(2000L, TimeUnit.SECONDS)
|
||||
config.setCompressionBlockBytes(100)
|
||||
|
@ -129,7 +92,6 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
|
|||
[reportEvent: { Severity s, String c, String m ->
|
||||
ReportedEvent event = new ReportedEvent(s, c, m)
|
||||
reportedEvents.add(event)
|
||||
logger.mock("Added ${event}")
|
||||
}] as EventReporter
|
||||
}
|
||||
|
||||
|
@ -235,9 +197,6 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
|
|||
|
||||
final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, RECORD_COUNT + 1)
|
||||
|
||||
logger.info("Recovered ${recoveredRecords.size()} events: ")
|
||||
recoveredRecords.each { logger.info("\t${it}") }
|
||||
|
||||
// Assert
|
||||
assert recoveredRecords.size() == RECORD_COUNT
|
||||
recoveredRecords.eachWithIndex { ProvenanceEventRecord recovered, int i ->
|
||||
|
@ -249,44 +208,6 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("test is unstable. NIFI-5624 to improve it")
|
||||
void testShouldRegisterAndGetEvent() {
|
||||
// Arrange
|
||||
|
||||
// Override the boolean determiner
|
||||
RecordReaders.encryptionPropertiesRead = true
|
||||
RecordReaders.isEncryptionAvailable = true
|
||||
|
||||
config = createEncryptedConfiguration()
|
||||
// Needed until NIFI-3605 is implemented
|
||||
// config.setMaxEventFileCapacity(1L)
|
||||
config.setMaxEventFileCount(1)
|
||||
config.setMaxEventFileLife(1, TimeUnit.SECONDS)
|
||||
repo = new EncryptedWriteAheadProvenanceRepository(config)
|
||||
repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
|
||||
|
||||
Map attributes = ["abc": "This is a plaintext attribute.",
|
||||
"123": "This is another plaintext attribute."]
|
||||
final ProvenanceEventRecord record = buildEventRecord(buildFlowFile(attributes))
|
||||
|
||||
final long LAST_RECORD_ID = repo.getMaxEventId()
|
||||
|
||||
// Act
|
||||
repo.registerEvent(record)
|
||||
|
||||
// Retrieve the event through the interface
|
||||
ProvenanceEventRecord recoveredRecord = repo.getEvent(LAST_RECORD_ID + 1)
|
||||
logger.info("Recovered ${recoveredRecord}")
|
||||
|
||||
// Assert
|
||||
assert recoveredRecord.getEventId() == LAST_RECORD_ID + 1
|
||||
assert recoveredRecord.getTransitUri() == TRANSIT_URI
|
||||
assert recoveredRecord.getEventType() == ProvenanceEventType.RECEIVE
|
||||
// The UUID was added later but we care that all attributes we provided are still there
|
||||
assert recoveredRecord.getAttributes().entrySet().containsAll(attributes.entrySet())
|
||||
}
|
||||
|
||||
@Test
|
||||
void testShouldRegisterAndGetEvents() {
|
||||
// Arrange
|
||||
|
@ -310,17 +231,14 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
|
|||
RECORD_COUNT.times { int i ->
|
||||
records << buildEventRecord(buildFlowFile(attributes + [count: i as String]))
|
||||
}
|
||||
logger.info("Generated ${RECORD_COUNT} records")
|
||||
|
||||
final long LAST_RECORD_ID = repo.getMaxEventId()
|
||||
|
||||
// Act
|
||||
repo.registerEvents(records)
|
||||
logger.info("Registered events")
|
||||
|
||||
// Retrieve the events through the interface
|
||||
List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(LAST_RECORD_ID + 1, RECORD_COUNT * 2)
|
||||
logger.info("Recovered ${recoveredRecords.size()} records")
|
||||
|
||||
// Assert
|
||||
recoveredRecords.eachWithIndex { ProvenanceEventRecord recoveredRecord, int i ->
|
||||
|
|
|
@ -25,8 +25,7 @@ import org.apache.nifi.provenance.toc.TocReader;
|
|||
import org.apache.nifi.provenance.toc.TocUtil;
|
||||
import org.apache.nifi.provenance.toc.TocWriter;
|
||||
import org.apache.nifi.util.file.FileUtils;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
|
@ -39,26 +38,21 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
|
||||
public abstract class AbstractTestRecordReaderWriter {
|
||||
@BeforeClass
|
||||
public static void setLogLevel() {
|
||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "INFO");
|
||||
}
|
||||
|
||||
protected ProvenanceEventRecord createEvent() {
|
||||
return TestUtil.createEvent();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleWriteWithToc() throws IOException {
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite");
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID() + "/testSimpleWrite");
|
||||
final File tocFile = TocUtil.getTocFile(journalFile);
|
||||
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
|
||||
final RecordWriter writer = createWriter(journalFile, tocWriter, false, 1024 * 1024);
|
||||
|
@ -92,7 +86,7 @@ public abstract class AbstractTestRecordReaderWriter {
|
|||
|
||||
@Test
|
||||
public void testSingleRecordCompressed() throws IOException {
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID() + "/testSimpleWrite.gz");
|
||||
final File tocFile = TocUtil.getTocFile(journalFile);
|
||||
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
|
||||
final RecordWriter writer = createWriter(journalFile, tocWriter, true, 8192);
|
||||
|
@ -111,7 +105,7 @@ public abstract class AbstractTestRecordReaderWriter {
|
|||
|
||||
@Test
|
||||
public void testMultipleRecordsSameBlockCompressed() throws IOException {
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID() + "/testSimpleWrite.gz");
|
||||
final File tocFile = TocUtil.getTocFile(journalFile);
|
||||
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
|
||||
// new record each 1 MB of uncompressed data
|
||||
|
@ -150,7 +144,7 @@ public abstract class AbstractTestRecordReaderWriter {
|
|||
|
||||
@Test
|
||||
public void testMultipleRecordsMultipleBlocksCompressed() throws IOException {
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID() + "/testSimpleWrite.gz");
|
||||
final File tocFile = TocUtil.getTocFile(journalFile);
|
||||
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
|
||||
// new block each 10 bytes
|
||||
|
@ -188,7 +182,7 @@ public abstract class AbstractTestRecordReaderWriter {
|
|||
|
||||
@Test
|
||||
public void testSkipToEvent() throws IOException {
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID() + "/testSimpleWrite.gz");
|
||||
final File tocFile = TocUtil.getTocFile(journalFile);
|
||||
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
|
||||
// new block each 10 bytes
|
||||
|
|
|
@ -49,20 +49,16 @@ import org.apache.nifi.provenance.serialization.RecordWriters;
|
|||
import org.apache.nifi.reporting.Severity;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.apache.nifi.util.file.FileUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileFilter;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
|
@ -88,27 +84,23 @@ import java.util.stream.Collectors;
|
|||
import java.util.zip.GZIPOutputStream;
|
||||
|
||||
import static org.apache.nifi.provenance.TestUtil.createFlowFile;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assume.assumeFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertSame;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
@Timeout(value = 10)
|
||||
public class ITestPersistentProvenanceRepository {
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
@ClassRule
|
||||
public static TemporaryFolder tempFolder = new TemporaryFolder();
|
||||
|
||||
private PersistentProvenanceRepository repo;
|
||||
private static RepositoryConfiguration config;
|
||||
|
||||
public static final int DEFAULT_ROLLOVER_MILLIS = 2000;
|
||||
private EventReporter eventReporter;
|
||||
private List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>());
|
||||
private final List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<>());
|
||||
|
||||
private static int headerSize;
|
||||
private static int recordSize;
|
||||
|
@ -116,19 +108,14 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
private static RepositoryConfiguration createConfiguration() {
|
||||
config = new RepositoryConfiguration();
|
||||
config.addStorageDirectory("1", new File("target/storage/" + UUID.randomUUID().toString()));
|
||||
config.addStorageDirectory("1", new File("target/storage/" + UUID.randomUUID()));
|
||||
config.setCompressOnRollover(true);
|
||||
config.setMaxEventFileLife(2000L, TimeUnit.SECONDS);
|
||||
config.setCompressionBlockBytes(100);
|
||||
return config;
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setLogLevel() {
|
||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
public static void findJournalSizes() throws IOException {
|
||||
// determine header and record size
|
||||
|
||||
|
@ -145,7 +132,8 @@ public class ITestPersistentProvenanceRepository {
|
|||
builder.setComponentId("2345");
|
||||
final ProvenanceEventRecord record2 = builder.build();
|
||||
|
||||
final File tempRecordFile = tempFolder.newFile("record.tmp");
|
||||
final File tempRecordFile = File.createTempFile("ProvenanceRepository", ".record");
|
||||
tempRecordFile.deleteOnExit();
|
||||
System.out.println("findJournalSizes position 0 = " + tempRecordFile.length());
|
||||
|
||||
final AtomicLong idGenerator = new AtomicLong(0L);
|
||||
|
@ -166,10 +154,8 @@ public class ITestPersistentProvenanceRepository {
|
|||
System.out.println("recordSize2=" + recordSize2);
|
||||
}
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void printTestName() {
|
||||
System.out.println("\n\n\n*********************** " + name.getMethodName() + " *****************************");
|
||||
|
||||
reportedEvents.clear();
|
||||
eventReporter = new EventReporter() {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
@ -182,7 +168,7 @@ public class ITestPersistentProvenanceRepository {
|
|||
};
|
||||
}
|
||||
|
||||
@After
|
||||
@AfterEach
|
||||
public void closeRepo() throws IOException {
|
||||
if (repo == null) {
|
||||
return;
|
||||
|
@ -210,7 +196,7 @@ public class ITestPersistentProvenanceRepository {
|
|||
throw ioe;
|
||||
} else {
|
||||
try {
|
||||
System.out.println("file: " + storageDir.toString() + " exists=" + storageDir.exists());
|
||||
System.out.println("file: " + storageDir + " exists=" + storageDir.exists());
|
||||
FileUtils.deleteFile(storageDir, true);
|
||||
break;
|
||||
} catch (final IOException ioe2) {
|
||||
|
@ -237,7 +223,7 @@ public class ITestPersistentProvenanceRepository {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Ignore("For local testing of performance only")
|
||||
@EnabledIfSystemProperty(named = "nifi.test.performance", matches = "true")
|
||||
public void testPerformance() throws IOException, InterruptedException {
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxEventFileCapacity(1024 * 1024 * 1024L);
|
||||
|
@ -352,7 +338,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testAddAndRecover() throws IOException, InterruptedException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxEventFileCapacity(1L);
|
||||
config.setMaxEventFileLife(1, TimeUnit.SECONDS);
|
||||
|
@ -399,7 +384,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testAddToMultipleLogsAndRecover() throws IOException, InterruptedException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final List<SearchableField> searchableFields = new ArrayList<>();
|
||||
searchableFields.add(SearchableFields.ComponentID);
|
||||
|
||||
|
@ -458,7 +442,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testIndexOnRolloverWithImmenseAttribute() throws IOException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
|
||||
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
|
||||
|
@ -504,7 +487,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testIndexOnRolloverAndSubsequentSearch() throws IOException, InterruptedException, ParseException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
|
||||
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
|
||||
|
@ -549,7 +531,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testCompressOnRollover() throws IOException, InterruptedException, ParseException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
|
||||
config.setCompressOnRollover(true);
|
||||
|
@ -584,7 +565,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testIndexAndCompressOnRolloverAndSubsequentSearch() throws IOException, InterruptedException, ParseException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxRecordLife(30, TimeUnit.SECONDS);
|
||||
config.setMaxStorageCapacity(1024L * 1024L * 10);
|
||||
|
@ -645,9 +625,8 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testIndexAndCompressOnRolloverAndSubsequentSearchMultipleStorageDirs() throws IOException, InterruptedException, ParseException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.addStorageDirectory("2", new File("target/storage/" + UUID.randomUUID().toString()));
|
||||
config.addStorageDirectory("2", new File("target/storage/" + UUID.randomUUID()));
|
||||
config.setMaxRecordLife(30, TimeUnit.SECONDS);
|
||||
config.setMaxStorageCapacity(1024L * 1024L);
|
||||
config.setMaxEventFileLife(1, TimeUnit.SECONDS);
|
||||
|
@ -734,7 +713,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testIndexAndCompressOnRolloverAndSubsequentEmptySearch() throws IOException, InterruptedException, ParseException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxRecordLife(30, TimeUnit.SECONDS);
|
||||
config.setMaxStorageCapacity(1024L * 1024L);
|
||||
|
@ -791,7 +769,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testLineageReceiveDrop() throws IOException, InterruptedException, ParseException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxRecordLife(3, TimeUnit.SECONDS);
|
||||
config.setMaxStorageCapacity(1024L * 1024L);
|
||||
|
@ -836,18 +813,17 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
for (final LineageEdge edge : edges) {
|
||||
if (edge.getSource().getNodeType() == LineageNodeType.FLOWFILE_NODE) {
|
||||
assertTrue(edge.getDestination().getNodeType() == LineageNodeType.PROVENANCE_EVENT_NODE);
|
||||
assertTrue(((EventNode) edge.getDestination()).getEventType() == ProvenanceEventType.DROP);
|
||||
assertSame(edge.getDestination().getNodeType(), LineageNodeType.PROVENANCE_EVENT_NODE);
|
||||
assertSame(((EventNode) edge.getDestination()).getEventType(), ProvenanceEventType.DROP);
|
||||
} else {
|
||||
assertTrue(((EventNode) edge.getSource()).getEventType() == ProvenanceEventType.RECEIVE);
|
||||
assertTrue(edge.getDestination().getNodeType() == LineageNodeType.FLOWFILE_NODE);
|
||||
assertSame(((EventNode) edge.getSource()).getEventType(), ProvenanceEventType.RECEIVE);
|
||||
assertSame(edge.getDestination().getNodeType(), LineageNodeType.FLOWFILE_NODE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLineageReceiveDropAsync() throws IOException, InterruptedException, ParseException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxRecordLife(3, TimeUnit.SECONDS);
|
||||
config.setMaxStorageCapacity(1024L * 1024L);
|
||||
|
@ -896,18 +872,17 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
for (final LineageEdge edge : edges) {
|
||||
if (edge.getSource().getNodeType() == LineageNodeType.FLOWFILE_NODE) {
|
||||
assertTrue(edge.getDestination().getNodeType() == LineageNodeType.PROVENANCE_EVENT_NODE);
|
||||
assertTrue(((EventNode) edge.getDestination()).getEventType() == ProvenanceEventType.DROP);
|
||||
assertSame(edge.getDestination().getNodeType(), LineageNodeType.PROVENANCE_EVENT_NODE);
|
||||
assertSame(((EventNode) edge.getDestination()).getEventType(), ProvenanceEventType.DROP);
|
||||
} else {
|
||||
assertTrue(((EventNode) edge.getSource()).getEventType() == ProvenanceEventType.RECEIVE);
|
||||
assertTrue(edge.getDestination().getNodeType() == LineageNodeType.FLOWFILE_NODE);
|
||||
assertSame(((EventNode) edge.getSource()).getEventType(), ProvenanceEventType.RECEIVE);
|
||||
assertSame(edge.getDestination().getNodeType(), LineageNodeType.FLOWFILE_NODE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLineageManyToOneSpawn() throws IOException, InterruptedException, ParseException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxRecordLife(3, TimeUnit.SECONDS);
|
||||
config.setMaxStorageCapacity(1024L * 1024L);
|
||||
|
@ -958,7 +933,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testLineageManyToOneSpawnAsync() throws IOException, InterruptedException, ParseException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxRecordLife(3, TimeUnit.SECONDS);
|
||||
config.setMaxStorageCapacity(1024L * 1024L);
|
||||
|
@ -1011,7 +985,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testCorrectProvenanceEventIdOnRestore() throws IOException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxEventFileLife(1, TimeUnit.SECONDS);
|
||||
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
|
||||
|
@ -1061,7 +1034,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
*/
|
||||
@Test
|
||||
public void testWithWithEventFileMissingRecord() throws Exception {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
File eventFile = this.prepCorruptedEventFileTests();
|
||||
|
||||
final Query query = new Query(UUID.randomUUID().toString());
|
||||
|
@ -1083,7 +1055,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
*/
|
||||
@Test
|
||||
public void testWithWithEventFileCorrupted() throws Exception {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
File eventFile = this.prepCorruptedEventFileTests();
|
||||
|
||||
final Query query = new Query(UUID.randomUUID().toString());
|
||||
|
@ -1123,92 +1094,8 @@ public class ITestPersistentProvenanceRepository {
|
|||
return eventFile;
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("This test relies too much on timing of background events by using Thread.sleep().")
|
||||
public void testIndexDirectoryRemoved() throws InterruptedException, IOException, ParseException {
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxRecordLife(5, TimeUnit.MINUTES);
|
||||
config.setMaxStorageCapacity(1024L * 1024L);
|
||||
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
|
||||
config.setMaxEventFileCapacity(1024L * 1024L);
|
||||
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
|
||||
config.setDesiredIndexSize(10); // force new index to be created for each rollover
|
||||
|
||||
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
|
||||
repo.initialize(getEventReporter(), null, null, IdentifierLookup.EMPTY);
|
||||
|
||||
final String uuid = "00000000-0000-0000-0000-000000000000";
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("abc", "xyz");
|
||||
attributes.put("xyz", "abc");
|
||||
attributes.put("filename", "file-" + uuid);
|
||||
|
||||
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
|
||||
builder.setEventTime(System.currentTimeMillis());
|
||||
builder.setEventType(ProvenanceEventType.RECEIVE);
|
||||
builder.setTransitUri("nifi://unit-test");
|
||||
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
|
||||
builder.setComponentId("1234");
|
||||
builder.setComponentType("dummy processor");
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
|
||||
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
|
||||
builder.setEventTime(10L); // make sure the events are destroyed when we call purge
|
||||
repo.registerEvent(builder.build());
|
||||
}
|
||||
|
||||
repo.waitForRollover();
|
||||
|
||||
Thread.sleep(2000L);
|
||||
|
||||
final FileFilter indexFileFilter = file -> file.getName().startsWith("index");
|
||||
final int numIndexDirs = config.getStorageDirectories().values().iterator().next().listFiles(indexFileFilter).length;
|
||||
assertEquals(1, numIndexDirs);
|
||||
|
||||
// add more records so that we will create a new index
|
||||
final long secondBatchStartTime = System.currentTimeMillis();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
attributes.put("uuid", "00000000-0000-0000-0000-00000000001" + i);
|
||||
builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
|
||||
builder.setEventTime(System.currentTimeMillis());
|
||||
repo.registerEvent(builder.build());
|
||||
}
|
||||
|
||||
// wait for indexing to happen
|
||||
repo.waitForRollover();
|
||||
|
||||
// verify we get the results expected
|
||||
final Query query = new Query(UUID.randomUUID().toString());
|
||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*", null));
|
||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4", null));
|
||||
query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*", null));
|
||||
query.setMaxResults(100);
|
||||
|
||||
final QueryResult result = repo.queryEvents(query, createUser());
|
||||
assertEquals(20, result.getMatchingEvents().size());
|
||||
|
||||
// Ensure index directories exists
|
||||
File[] indexDirs = config.getStorageDirectories().values().iterator().next().listFiles(indexFileFilter);
|
||||
assertEquals(2, indexDirs.length);
|
||||
|
||||
// expire old events and indexes
|
||||
final long timeSinceSecondBatch = System.currentTimeMillis() - secondBatchStartTime;
|
||||
config.setMaxRecordLife(timeSinceSecondBatch + 1000L, TimeUnit.MILLISECONDS);
|
||||
repo.purgeOldEvents();
|
||||
Thread.sleep(2000L);
|
||||
|
||||
final QueryResult newRecordSet = repo.queryEvents(query, createUser());
|
||||
assertEquals(10, newRecordSet.getMatchingEvents().size());
|
||||
|
||||
// Ensure that one index directory is gone
|
||||
indexDirs = config.getStorageDirectories().values().iterator().next().listFiles(indexFileFilter);
|
||||
assertEquals(1, indexDirs.length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotAuthorizedGetSpecificEvent() throws IOException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxRecordLife(5, TimeUnit.MINUTES);
|
||||
config.setMaxStorageCapacity(1024L * 1024L);
|
||||
|
@ -1250,17 +1137,11 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
repo.waitForRollover();
|
||||
|
||||
try {
|
||||
repo.getEvent(0L, null);
|
||||
Assert.fail("getEvent() did not throw an Exception");
|
||||
} catch (final Exception e) {
|
||||
Assert.assertSame(expectedException, e);
|
||||
}
|
||||
assertThrows(expectedException.getClass(), () -> repo.getEvent(0, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotAuthorizedGetEventRange() throws IOException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxRecordLife(5, TimeUnit.MINUTES);
|
||||
config.setMaxStorageCapacity(1024L * 1024L);
|
||||
|
@ -1307,13 +1188,12 @@ public class ITestPersistentProvenanceRepository {
|
|||
assertEquals(7, events.size());
|
||||
final List<Long> eventIds = events.stream().map(event -> event.getEventId()).sorted().collect(Collectors.toList());
|
||||
for (int i = 0; i < 7; i++) {
|
||||
Assert.assertEquals(i + 3, eventIds.get(i).intValue());
|
||||
assertEquals(i + 3, eventIds.get(i).intValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
@Test
|
||||
public void testNotAuthorizedQuery() throws IOException, InterruptedException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxRecordLife(5, TimeUnit.MINUTES);
|
||||
config.setMaxStorageCapacity(1024L * 1024L);
|
||||
|
@ -1368,13 +1248,12 @@ public class ITestPersistentProvenanceRepository {
|
|||
assertEquals(7, events.size());
|
||||
final List<Long> eventIds = events.stream().map(event -> event.getEventId()).sorted().collect(Collectors.toList());
|
||||
for (int i = 0; i < 7; i++) {
|
||||
Assert.assertEquals(i + 3, eventIds.get(i).intValue());
|
||||
assertEquals(i + 3, eventIds.get(i).intValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 1000000)
|
||||
@Test
|
||||
public void testNotAuthorizedLineage() throws IOException, InterruptedException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxRecordLife(5, TimeUnit.MINUTES);
|
||||
config.setMaxStorageCapacity(1024L * 1024L);
|
||||
|
@ -1495,7 +1374,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testBackPressure() throws IOException, InterruptedException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxEventFileCapacity(1L); // force rollover on each record.
|
||||
config.setJournalCount(1);
|
||||
|
@ -1560,7 +1438,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testTextualQuery() throws InterruptedException, IOException, ParseException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
|
||||
config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
|
||||
|
@ -1606,7 +1483,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
}
|
||||
|
||||
private List<Document> runQuery(final File indexDirectory, final List<File> storageDirs, final String query) throws IOException, ParseException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
try (final DirectoryReader directoryReader = DirectoryReader.open(FSDirectory.open(indexDirectory.toPath()))) {
|
||||
final IndexSearcher searcher = new IndexSearcher(directoryReader);
|
||||
|
||||
|
@ -1639,7 +1515,7 @@ public class ITestPersistentProvenanceRepository {
|
|||
ProvenanceEventRecord last = null;
|
||||
while ((r = reader.nextRecord()) != null) {
|
||||
if (exact) {
|
||||
assertTrue(counter++ == r.getEventId());
|
||||
assertEquals(counter++, r.getEventId());
|
||||
} else {
|
||||
assertTrue(counter++ <= r.getEventId());
|
||||
}
|
||||
|
@ -1652,7 +1528,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testMergeJournals() throws IOException, InterruptedException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxEventFileLife(3, TimeUnit.SECONDS);
|
||||
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
|
||||
|
@ -1720,7 +1595,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testMergeJournalsBadFirstRecord() throws IOException, InterruptedException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxEventFileLife(3, TimeUnit.SECONDS);
|
||||
TestablePersistentProvenanceRepository testRepo = new TestablePersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
|
||||
|
@ -1767,7 +1641,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testMergeJournalsBadRecordAfterFirst() throws IOException, InterruptedException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxEventFileLife(3, TimeUnit.SECONDS);
|
||||
TestablePersistentProvenanceRepository testRepo = new TestablePersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
|
||||
|
@ -1818,7 +1691,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testMergeJournalsEmptyJournal() throws IOException, InterruptedException {
|
||||
assumeFalse(isWindowsEnvironment());//skip on window
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxEventFileLife(3, TimeUnit.SECONDS);
|
||||
TestablePersistentProvenanceRepository testRepo = new TestablePersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
|
||||
|
@ -1857,7 +1729,7 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
testRepo.recoverJournalFiles();
|
||||
|
||||
assertEquals("mergeJournals() should not error on empty journal", 0, reportedEvents.size());
|
||||
assertEquals(0, reportedEvents.size(),"mergeJournals() should not error on empty journal");
|
||||
|
||||
final File storageDir = config.getStorageDirectories().values().iterator().next();
|
||||
assertEquals(config.getJournalCount() - 1, checkJournalRecords(storageDir, true));
|
||||
|
@ -1865,7 +1737,6 @@ public class ITestPersistentProvenanceRepository {
|
|||
|
||||
@Test
|
||||
public void testRolloverRetry() throws IOException, InterruptedException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final AtomicInteger retryAmount = new AtomicInteger(0);
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxEventFileLife(3, TimeUnit.SECONDS);
|
||||
|
@ -1920,8 +1791,7 @@ public class ITestPersistentProvenanceRepository {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testTruncateAttributes() throws IOException, InterruptedException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
public void testTruncateAttributes() throws IOException {
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxAttributeChars(50);
|
||||
config.setMaxEventFileLife(3, TimeUnit.SECONDS);
|
||||
|
@ -1958,9 +1828,8 @@ public class ITestPersistentProvenanceRepository {
|
|||
assertEquals(maxLengthChars.substring(0, 49), retrieved.getAttributes().get("49chars"));
|
||||
}
|
||||
|
||||
@Test(timeout = 15000)
|
||||
@Test
|
||||
public void testExceptionOnIndex() throws IOException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxAttributeChars(50);
|
||||
config.setMaxEventFileLife(3, TimeUnit.SECONDS);
|
||||
|
@ -2012,8 +1881,7 @@ public class ITestPersistentProvenanceRepository {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testFailureToCreateWriterDoesNotPreventSubsequentRollover() throws IOException, InterruptedException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
public void testFailureToCreateWriterDoesNotPreventSubsequentRollover() throws IOException {
|
||||
final RepositoryConfiguration config = createConfiguration();
|
||||
config.setMaxAttributeChars(50);
|
||||
config.setMaxEventFileLife(3, TimeUnit.SECONDS);
|
||||
|
@ -2055,12 +1923,7 @@ public class ITestPersistentProvenanceRepository {
|
|||
}
|
||||
|
||||
// Attempt to rollover but fail to create new writers.
|
||||
try {
|
||||
repo.rolloverWithLock(true);
|
||||
Assert.fail("Expected to get IOException when calling rolloverWithLock");
|
||||
} catch (final IOException ioe) {
|
||||
assertTrue(ioe == failure);
|
||||
}
|
||||
assertThrows(IOException.class, () -> repo.rolloverWithLock(true));
|
||||
|
||||
// Wait for the first rollover to succeed.
|
||||
repo.waitForRollover();
|
||||
|
|
|
@ -18,7 +18,8 @@ package org.apache.nifi.provenance;
|
|||
|
||||
import org.apache.nifi.authorization.Authorizer;
|
||||
import org.apache.nifi.events.EventReporter;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -30,17 +31,18 @@ import java.util.Arrays;
|
|||
import java.util.Collections;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
/**
|
||||
* With NiFi 1.10.0 (?) we changed from Lucene 4.x to Lucene 8.x
|
||||
* This test is intended to ensure that we can properly startup even when pointing to a Provenance
|
||||
* Repository that was created against the old Lucene.
|
||||
*/
|
||||
@Timeout(value = 5)
|
||||
public class StartupAgainstOldLuceneIndexIT {
|
||||
|
||||
@Test(timeout = 30000)
|
||||
@Test
|
||||
public void testStartup() throws IOException, InterruptedException {
|
||||
// Test startup with old lucene 4 index directory and no temp or migrated directory.
|
||||
testStartup(false, false);
|
||||
|
@ -54,7 +56,7 @@ public class StartupAgainstOldLuceneIndexIT {
|
|||
|
||||
private void testStartup(final boolean createTempDirectory, final boolean createMigratedDirectory) throws IOException, InterruptedException {
|
||||
final File existingRepo = new File("src/test/resources/lucene-4-prov-repo");
|
||||
final File tempDir = new File("target/" + UUID.randomUUID().toString());
|
||||
final File tempDir = new File("target/" + UUID.randomUUID());
|
||||
|
||||
copy(existingRepo, tempDir);
|
||||
final File oldIndexDir = new File(tempDir, "index-1554304717707");
|
||||
|
|
|
@ -25,11 +25,9 @@ import org.apache.nifi.provenance.toc.TocReader;
|
|||
import org.apache.nifi.provenance.toc.TocUtil;
|
||||
import org.apache.nifi.provenance.toc.TocWriter;
|
||||
import org.apache.nifi.util.file.FileUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
|
@ -44,30 +42,25 @@ import java.util.concurrent.Callable;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
|
||||
public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter {
|
||||
private final AtomicLong idGenerator = new AtomicLong(0L);
|
||||
private File journalFile;
|
||||
private File tocFile;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupLogger() {
|
||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "DEBUG");
|
||||
}
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testEventIdFirstSchemaRecordReaderWriter");
|
||||
journalFile = new File("target/storage/" + UUID.randomUUID() + "/testEventIdFirstSchemaRecordReaderWriter");
|
||||
tocFile = TocUtil.getTocFile(journalFile);
|
||||
idGenerator.set(0L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContentClaimUnchanged() throws IOException {
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID() + "/testSimpleWrite.gz");
|
||||
final File tocFile = TocUtil.getTocFile(journalFile);
|
||||
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
|
||||
final RecordWriter writer = createWriter(journalFile, tocWriter, true, 8192);
|
||||
|
@ -125,7 +118,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
|
|||
|
||||
@Test
|
||||
public void testContentClaimRemoved() throws IOException {
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID() + "/testSimpleWrite.gz");
|
||||
final File tocFile = TocUtil.getTocFile(journalFile);
|
||||
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
|
||||
final RecordWriter writer = createWriter(journalFile, tocWriter, true, 8192);
|
||||
|
@ -183,7 +176,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
|
|||
|
||||
@Test
|
||||
public void testContentClaimAdded() throws IOException {
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID() + "/testSimpleWrite.gz");
|
||||
final File tocFile = TocUtil.getTocFile(journalFile);
|
||||
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
|
||||
final RecordWriter writer = createWriter(journalFile, tocWriter, true, 8192);
|
||||
|
@ -240,7 +233,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
|
|||
|
||||
@Test
|
||||
public void testContentClaimChanged() throws IOException {
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID() + "/testSimpleWrite.gz");
|
||||
final File tocFile = TocUtil.getTocFile(journalFile);
|
||||
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
|
||||
final RecordWriter writer = createWriter(journalFile, tocWriter, true, 8192);
|
||||
|
@ -298,7 +291,7 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
|
|||
|
||||
@Test
|
||||
public void testEventIdAndTimestampCorrect() throws IOException {
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID() + "/testSimpleWrite.gz");
|
||||
final File tocFile = TocUtil.getTocFile(journalFile);
|
||||
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
|
||||
final RecordWriter writer = createWriter(journalFile, tocWriter, true, 8192);
|
||||
|
@ -340,10 +333,9 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
|
|||
FileUtils.deleteFile(journalFile.getParentFile(), true);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testComponentIdInlineAndLookup() throws IOException {
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.prov");
|
||||
final File journalFile = new File("target/storage/" + UUID.randomUUID() + "/testSimpleWrite.prov");
|
||||
final File tocFile = TocUtil.getTocFile(journalFile);
|
||||
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
|
||||
|
||||
|
@ -425,9 +417,9 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
|
|||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
@EnabledIfSystemProperty(named = "nifi.test.performance", matches = "true")
|
||||
public void testPerformanceOfRandomAccessReads() throws Exception {
|
||||
journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testPerformanceOfRandomAccessReads.gz");
|
||||
journalFile = new File("target/storage/" + UUID.randomUUID() + "/testPerformanceOfRandomAccessReads.gz");
|
||||
tocFile = TocUtil.getTocFile(journalFile);
|
||||
|
||||
final int blockSize = 1024 * 32;
|
||||
|
@ -465,13 +457,8 @@ public class TestEventIdFirstSchemaRecordReaderWriter extends AbstractTestRecord
|
|||
}
|
||||
|
||||
private void time(final Callable<StandardProvenanceEventRecord> task, final long id) throws Exception {
|
||||
final long start = System.nanoTime();
|
||||
final StandardProvenanceEventRecord event = task.call();
|
||||
Assert.assertNotNull(event);
|
||||
Assert.assertEquals(id, event.getEventId());
|
||||
// System.out.println(event);
|
||||
final long nanos = System.nanoTime() - start;
|
||||
final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
|
||||
// System.out.println("Took " + millis + " ms to " + taskDescription);
|
||||
assertNotNull(event);
|
||||
assertEquals(id, event.getEventId());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,11 +17,11 @@
|
|||
|
||||
package org.apache.nifi.provenance;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataOutputStream;
|
||||
|
@ -59,28 +59,26 @@ import org.apache.nifi.repository.schema.RecordSchema;
|
|||
import org.apache.nifi.repository.schema.Repetition;
|
||||
import org.apache.nifi.repository.schema.SimpleRecordField;
|
||||
import org.apache.nifi.stream.io.NullOutputStream;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
|
||||
|
||||
public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter {
|
||||
private final AtomicLong idGenerator = new AtomicLong(0L);
|
||||
private File journalFile;
|
||||
private File tocFile;
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testFieldAddedToSchema");
|
||||
journalFile = new File("target/storage/" + UUID.randomUUID() + "/testFieldAddedToSchema");
|
||||
tocFile = TocUtil.getTocFile(journalFile);
|
||||
idGenerator.set(0L);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
@Ignore("runs forever for performance analysis/profiling")
|
||||
@EnabledIfSystemProperty(named = "nifi.test.performance", matches = "true")
|
||||
public void testPerformanceOfRandomAccessReads() throws Exception {
|
||||
journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testPerformanceOfRandomAccessReads.gz");
|
||||
journalFile = new File("target/storage/" + UUID.randomUUID() + "/testPerformanceOfRandomAccessReads.gz");
|
||||
tocFile = TocUtil.getTocFile(journalFile);
|
||||
|
||||
try (final RecordWriter writer = createWriter(journalFile, new StandardTocWriter(tocFile, true, false), true, 1024 * 32)) {
|
||||
|
@ -117,14 +115,9 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter
|
|||
}
|
||||
|
||||
private void time(final Callable<StandardProvenanceEventRecord> task, final long id) throws Exception {
|
||||
final long start = System.nanoTime();
|
||||
final StandardProvenanceEventRecord event = task.call();
|
||||
Assert.assertNotNull(event);
|
||||
Assert.assertEquals(id, event.getEventId());
|
||||
// System.out.println(event);
|
||||
final long nanos = System.nanoTime() - start;
|
||||
final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
|
||||
// System.out.println("Took " + millis + " ms to " + taskDescription);
|
||||
assertNotNull(event);
|
||||
assertEquals(id, event.getEventId());
|
||||
}
|
||||
|
||||
|
||||
|
@ -291,7 +284,7 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter
|
|||
}
|
||||
|
||||
@Test
|
||||
@Ignore("For local testing only")
|
||||
@EnabledIfSystemProperty(named = "nifi.test.performance", matches = "true")
|
||||
public void testWritePerformance() throws IOException {
|
||||
// This is a simple micro-benchmarking test so that we can determine how fast the serialization/deserialization is before
|
||||
// making significant changes. This allows us to ensure that changes that we make do not have significant adverse effects
|
||||
|
@ -317,17 +310,14 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter
|
|||
System.out.println("Took " + millis + " millis to write " + numEvents + " events");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
@Ignore("For local performance testing only")
|
||||
public void testReadPerformance() throws IOException, InterruptedException {
|
||||
@EnabledIfSystemProperty(named = "nifi.test.performance", matches = "true")
|
||||
public void testReadPerformance() throws IOException {
|
||||
// This is a simple micro-benchmarking test so that we can determine how fast the serialization/deserialization is before
|
||||
// making significant changes. This allows us to ensure that changes that we make do not have significant adverse effects
|
||||
// on performance of the repository.
|
||||
final ProvenanceEventRecord event = createEvent();
|
||||
|
||||
final TocReader tocReader = null;
|
||||
|
||||
final byte[] header;
|
||||
try (final ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
|
||||
final DataOutputStream out = new DataOutputStream(headerOut)) {
|
||||
|
@ -356,7 +346,7 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter
|
|||
|
||||
final long startNanos = System.nanoTime();
|
||||
try (final InputStream in = new LoopingInputStream(header, serializedRecord);
|
||||
final RecordReader reader = new ByteArraySchemaRecordReader(in, "filename", tocReader, 100000)) {
|
||||
final RecordReader reader = new ByteArraySchemaRecordReader(in, "filename", null, 100000)) {
|
||||
|
||||
for (int i = 0; i < numEvents; i++) {
|
||||
reader.nextRecord();
|
||||
|
@ -377,10 +367,8 @@ public class TestSchemaRecordReaderWriter extends AbstractTestRecordReaderWriter
|
|||
return new ByteArraySchemaRecordWriter(file, idGenerator, tocWriter, compressed, uncompressedBlockSize);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected RecordReader createReader(InputStream in, String journalFilename, TocReader tocReader, int maxAttributeSize) throws IOException {
|
||||
final ByteArraySchemaRecordReader reader = new ByteArraySchemaRecordReader(in, journalFilename, tocReader, maxAttributeSize);
|
||||
return reader;
|
||||
return new ByteArraySchemaRecordReader(in, journalFilename, tocReader, maxAttributeSize);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
package org.apache.nifi.provenance;
|
||||
|
||||
import static org.apache.nifi.provenance.TestUtil.createFlowFile;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataOutputStream;
|
||||
|
@ -25,6 +25,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
@ -37,21 +38,21 @@ import org.apache.nifi.provenance.toc.NopTocWriter;
|
|||
import org.apache.nifi.provenance.toc.TocReader;
|
||||
import org.apache.nifi.provenance.toc.TocWriter;
|
||||
import org.apache.nifi.stream.io.NullOutputStream;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public class TestStandardRecordReaderWriter extends AbstractTestRecordReaderWriter {
|
||||
private AtomicLong idGenerator = new AtomicLong(0L);
|
||||
private final AtomicLong idGenerator = new AtomicLong(0L);
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void resetIds() {
|
||||
idGenerator.set(0L);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("For local testing only")
|
||||
@EnabledIfSystemProperty(named = "nifi.test.performance", matches = "true")
|
||||
public void testWritePerformance() throws IOException {
|
||||
// This is a simple micro-benchmarking test so that we can determine how fast the serialization/deserialization is before
|
||||
// making significant changes. This allows us to ensure that changes that we make do not have significant adverse effects
|
||||
|
@ -78,15 +79,13 @@ public class TestStandardRecordReaderWriter extends AbstractTestRecordReaderWrit
|
|||
}
|
||||
|
||||
@Test
|
||||
@Ignore("For local testing only")
|
||||
@EnabledIfSystemProperty(named = "nifi.test.performance", matches = "true")
|
||||
public void testReadPerformance() throws IOException {
|
||||
// This is a simple micro-benchmarking test so that we can determine how fast the serialization/deserialization is before
|
||||
// making significant changes. This allows us to ensure that changes that we make do not have significant adverse effects
|
||||
// on performance of the repository.
|
||||
final ProvenanceEventRecord event = createEvent();
|
||||
|
||||
final TocReader tocReader = null;
|
||||
|
||||
final byte[] header;
|
||||
try (final ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
|
||||
final DataOutputStream out = new DataOutputStream(headerOut)) {
|
||||
|
@ -110,7 +109,7 @@ public class TestStandardRecordReaderWriter extends AbstractTestRecordReaderWrit
|
|||
final int numEvents = 10_000_000;
|
||||
final long startNanos = System.nanoTime();
|
||||
try (final InputStream in = new LoopingInputStream(header, serializedRecord);
|
||||
final RecordReader reader = new StandardRecordReader(in, "filename", tocReader, 100000)) {
|
||||
final RecordReader reader = new StandardRecordReader(in, "filename", null, 100000)) {
|
||||
|
||||
for (int i = 0; i < numEvents; i++) {
|
||||
reader.nextRecord();
|
||||
|
@ -123,7 +122,7 @@ public class TestStandardRecordReaderWriter extends AbstractTestRecordReaderWrit
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testWriteUtfLargerThan64k() throws IOException, InterruptedException {
|
||||
public void testWriteUtfLargerThan64k() throws IOException {
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("filename", "1.txt");
|
||||
|
@ -138,7 +137,7 @@ public class TestStandardRecordReaderWriter extends AbstractTestRecordReaderWrit
|
|||
builder.setComponentType("dummy processor");
|
||||
final String seventyK = StringUtils.repeat("X", 70000);
|
||||
assertTrue(seventyK.length() > 65535);
|
||||
assertTrue(seventyK.getBytes("UTF-8").length > 65535);
|
||||
assertTrue(seventyK.getBytes(StandardCharsets.UTF_8).length > 65535);
|
||||
builder.setDetails(seventyK);
|
||||
final ProvenanceEventRecord record = builder.build();
|
||||
|
||||
|
|
|
@ -27,27 +27,27 @@ import org.apache.nifi.provenance.index.EventIndexWriter;
|
|||
import org.apache.nifi.provenance.lucene.IndexManager;
|
||||
import org.apache.nifi.provenance.lucene.LuceneEventIndexWriter;
|
||||
import org.apache.nifi.provenance.serialization.StorageSummary;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTimeout;
|
||||
|
||||
public class TestEventIndexTask {
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClass() {
|
||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "DEBUG");
|
||||
@Test
|
||||
public void testIndexWriterCommittedWhenAppropriate() {
|
||||
assertTimeout(Duration.ofSeconds(5), this::runIndexWriteCommittedWhenAppropriate);
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void testIndexWriterCommittedWhenAppropriate() throws IOException, InterruptedException {
|
||||
private void runIndexWriteCommittedWhenAppropriate() throws InterruptedException, IOException {
|
||||
final BlockingQueue<StoredDocument> docQueue = new LinkedBlockingQueue<>();
|
||||
final RepositoryConfiguration repoConfig = new RepositoryConfiguration();
|
||||
final File storageDir = new File("target/storage/TestEventIndexTask/1");
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
package org.apache.nifi.provenance.index.lucene;
|
||||
|
||||
import org.apache.nifi.provenance.RepositoryConfiguration;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
|
@ -28,8 +28,8 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestIndexDirectoryManager {
|
||||
|
||||
|
@ -132,7 +132,7 @@ public class TestIndexDirectoryManager {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testGetDirectoriesBefore() throws InterruptedException {
|
||||
public void testGetDirectoriesBefore() {
|
||||
final RepositoryConfiguration config = createConfig(2);
|
||||
config.setDesiredIndexSize(4096 * 128);
|
||||
|
||||
|
|
|
@ -41,14 +41,12 @@ import org.apache.nifi.provenance.store.ArrayListEventStore;
|
|||
import org.apache.nifi.provenance.store.EventStore;
|
||||
import org.apache.nifi.provenance.store.StorageResult;
|
||||
import org.apache.nifi.util.Tuple;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
import org.junit.jupiter.api.condition.DisabledOnOs;
|
||||
import org.junit.jupiter.api.condition.OS;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -64,29 +62,18 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assume.assumeFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@DisabledOnOs(OS.WINDOWS)
|
||||
@Timeout(value = 5)
|
||||
public class TestLuceneEventIndex {
|
||||
|
||||
private final AtomicLong idGenerator = new AtomicLong(0L);
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
|
||||
@BeforeClass
|
||||
public static void setLogger() {
|
||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "DEBUG");
|
||||
}
|
||||
|
||||
private boolean isWindowsEnvironment() {
|
||||
return System.getProperty("os.name").toLowerCase().startsWith("windows");
|
||||
}
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
idGenerator.set(0L);
|
||||
}
|
||||
|
@ -115,9 +102,8 @@ public class TestLuceneEventIndex {
|
|||
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
@Test
|
||||
public void testGetMinimumIdToReindex() throws InterruptedException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration repoConfig = createConfig(1);
|
||||
repoConfig.setDesiredIndexSize(1L);
|
||||
final IndexManager indexManager = new StandardIndexManager(repoConfig);
|
||||
|
@ -140,9 +126,8 @@ public class TestLuceneEventIndex {
|
|||
assertTrue(id >= 30000L);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
@Test
|
||||
public void testUnauthorizedEventsGetPlaceholdersForLineage() throws InterruptedException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration repoConfig = createConfig(1);
|
||||
repoConfig.setDesiredIndexSize(1L);
|
||||
final IndexManager indexManager = new StandardIndexManager(repoConfig);
|
||||
|
@ -177,88 +162,8 @@ public class TestLuceneEventIndex {
|
|||
}
|
||||
}
|
||||
|
||||
@Ignore("This test is unreliable in certain build environments")
|
||||
@Test(timeout = 60000)
|
||||
public void testUnauthorizedEventsGetPlaceholdersForExpandChildren() throws InterruptedException, IOException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration repoConfig = createConfig(1);
|
||||
repoConfig.setDesiredIndexSize(1L);
|
||||
final IndexManager indexManager = new StandardIndexManager(repoConfig);
|
||||
|
||||
final ArrayListEventStore eventStore = new ArrayListEventStore();
|
||||
final LuceneEventIndex index = new LuceneEventIndex(repoConfig, indexManager, 3, EventReporter.NO_OP);
|
||||
index.initialize(eventStore);
|
||||
|
||||
final ProvenanceEventRecord firstEvent = createEvent("4444");
|
||||
|
||||
final Map<String, String> previousAttributes = new HashMap<>();
|
||||
previousAttributes.put("uuid", "4444");
|
||||
final Map<String, String> updatedAttributes = new HashMap<>();
|
||||
updatedAttributes.put("updated", "true");
|
||||
final ProvenanceEventRecord fork = new StandardProvenanceEventRecord.Builder()
|
||||
.setEventType(ProvenanceEventType.FORK)
|
||||
.setAttributes(previousAttributes, updatedAttributes)
|
||||
.addChildFlowFile("1234")
|
||||
.setComponentId("component-1")
|
||||
.setComponentType("unit test")
|
||||
.setEventId(idGenerator.getAndIncrement())
|
||||
.setEventTime(System.currentTimeMillis())
|
||||
.setFlowFileEntryDate(System.currentTimeMillis())
|
||||
.setFlowFileUUID("4444")
|
||||
.setLineageStartDate(System.currentTimeMillis())
|
||||
.setCurrentContentClaim("container", "section", "unit-test-id", 0L, 1024L)
|
||||
.build();
|
||||
|
||||
index.addEvents(eventStore.addEvent(firstEvent).getStorageLocations());
|
||||
index.addEvents(eventStore.addEvent(fork).getStorageLocations());
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
final ProvenanceEventRecord event = createEvent("1234");
|
||||
final StorageResult storageResult = eventStore.addEvent(event);
|
||||
index.addEvents(storageResult.getStorageLocations());
|
||||
}
|
||||
|
||||
final NiFiUser user = createUser();
|
||||
|
||||
final EventAuthorizer allowForkEvents = new EventAuthorizer() {
|
||||
@Override
|
||||
public boolean isAuthorized(ProvenanceEventRecord event) {
|
||||
return event.getEventType() == ProvenanceEventType.FORK;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void authorize(ProvenanceEventRecord event) throws AccessDeniedException {
|
||||
}
|
||||
};
|
||||
|
||||
List<LineageNode> nodes = Collections.emptyList();
|
||||
while (nodes.size() < 5) {
|
||||
final ComputeLineageSubmission submission = index.submitExpandChildren(fork.getEventId(), user, allowForkEvents);
|
||||
assertTrue(submission.getResult().awaitCompletion(15, TimeUnit.SECONDS));
|
||||
|
||||
nodes = submission.getResult().getNodes();
|
||||
Thread.sleep(25L);
|
||||
}
|
||||
|
||||
nodes.forEach(System.out::println);
|
||||
|
||||
assertEquals(5, nodes.size());
|
||||
|
||||
assertEquals(1L, nodes.stream().filter(n -> n.getNodeType() == LineageNodeType.FLOWFILE_NODE).count());
|
||||
assertEquals(4L, nodes.stream().filter(n -> n.getNodeType() == LineageNodeType.PROVENANCE_EVENT_NODE).count());
|
||||
|
||||
final Map<ProvenanceEventType, List<LineageNode>> eventMap = nodes.stream()
|
||||
.filter(n -> n.getNodeType() == LineageNodeType.PROVENANCE_EVENT_NODE)
|
||||
.collect(Collectors.groupingBy(n -> ((ProvenanceEventLineageNode) n).getEventType()));
|
||||
|
||||
assertEquals(2, eventMap.size());
|
||||
assertEquals(1, eventMap.get(ProvenanceEventType.FORK).size());
|
||||
assertEquals(3, eventMap.get(ProvenanceEventType.UNKNOWN).size());
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
@Test
|
||||
public void testUnauthorizedEventsGetPlaceholdersForFindParents() throws InterruptedException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration repoConfig = createConfig(1);
|
||||
repoConfig.setDesiredIndexSize(1L);
|
||||
final IndexManager indexManager = new StandardIndexManager(repoConfig);
|
||||
|
@ -332,9 +237,8 @@ public class TestLuceneEventIndex {
|
|||
assertEquals("4444", eventMap.get(ProvenanceEventType.UNKNOWN).get(0).getFlowFileUuid());
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
@Test
|
||||
public void testUnauthorizedEventsGetFilteredForQuery() throws InterruptedException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration repoConfig = createConfig(1);
|
||||
repoConfig.setDesiredIndexSize(1L);
|
||||
final IndexManager indexManager = new StandardIndexManager(repoConfig);
|
||||
|
@ -412,7 +316,7 @@ public class TestLuceneEventIndex {
|
|||
};
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
@Test
|
||||
public void testExpiration() throws IOException {
|
||||
final RepositoryConfiguration repoConfig = createConfig(1);
|
||||
repoConfig.setDesiredIndexSize(1L);
|
||||
|
@ -425,14 +329,11 @@ public class TestLuceneEventIndex {
|
|||
events.add(createEvent());
|
||||
|
||||
final EventStore eventStore = Mockito.mock(EventStore.class);
|
||||
Mockito.doAnswer(new Answer<List<ProvenanceEventRecord>>() {
|
||||
@Override
|
||||
public List<ProvenanceEventRecord> answer(final InvocationOnMock invocation) {
|
||||
final Long eventId = invocation.getArgument(0);
|
||||
assertEquals(0, eventId.longValue());
|
||||
assertEquals(1, invocation.<Integer>getArgument(1).intValue());
|
||||
return Collections.singletonList(events.get(0));
|
||||
}
|
||||
Mockito.doAnswer((Answer<List<ProvenanceEventRecord>>) invocation -> {
|
||||
final Long eventId = invocation.getArgument(0);
|
||||
assertEquals(0, eventId.longValue());
|
||||
assertEquals(1, invocation.<Integer>getArgument(1).intValue());
|
||||
return Collections.singletonList(events.get(0));
|
||||
}).when(eventStore).getEvents(Mockito.anyLong(), Mockito.anyInt());
|
||||
|
||||
index.initialize(eventStore);
|
||||
|
@ -453,9 +354,8 @@ public class TestLuceneEventIndex {
|
|||
return new StorageSummary(eventId, "1.prov", "1", 1, 2L, 2L);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
@Test
|
||||
public void addThenQueryWithEmptyQuery() throws InterruptedException {
|
||||
assumeFalse(isWindowsEnvironment());
|
||||
final RepositoryConfiguration repoConfig = createConfig();
|
||||
final IndexManager indexManager = new StandardIndexManager(repoConfig);
|
||||
|
||||
|
@ -494,7 +394,7 @@ public class TestLuceneEventIndex {
|
|||
assertEquals(event, matchingEvents.get(0));
|
||||
}
|
||||
|
||||
@Test(timeout = 50000)
|
||||
@Test
|
||||
public void testQuerySpecificField() throws InterruptedException {
|
||||
final RepositoryConfiguration repoConfig = createConfig();
|
||||
final IndexManager indexManager = new StandardIndexManager(repoConfig);
|
||||
|
@ -537,8 +437,8 @@ public class TestLuceneEventIndex {
|
|||
assertEquals(event, matchingEvents.get(0));
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void testQueryInverseSpecificField() throws InterruptedException, IOException {
|
||||
@Test
|
||||
public void testQueryInverseSpecificField() throws InterruptedException {
|
||||
final List<SearchableField> searchableFields = new ArrayList<>();
|
||||
searchableFields.add(SearchableFields.ComponentID);
|
||||
searchableFields.add(SearchableFields.FlowFileUUID);
|
||||
|
@ -596,8 +496,7 @@ public class TestLuceneEventIndex {
|
|||
|
||||
private RepositoryConfiguration createConfig(final int storageDirectoryCount) {
|
||||
final RepositoryConfiguration config = new RepositoryConfiguration();
|
||||
final String unitTestName = testName.getMethodName();
|
||||
final File storageDir = new File("target/storage/" + unitTestName + "/" + UUID.randomUUID().toString());
|
||||
final File storageDir = new File("target/storage/" + getClass().getSimpleName() + "/" + UUID.randomUUID());
|
||||
|
||||
for (int i = 0; i < storageDirectoryCount; i++) {
|
||||
config.addStorageDirectory(String.valueOf(i + 1), new File(storageDir, String.valueOf(i)));
|
||||
|
@ -635,7 +534,7 @@ public class TestLuceneEventIndex {
|
|||
final Map<String, String> updatedAttributes = new HashMap<>();
|
||||
updatedAttributes.put("updated", "true");
|
||||
|
||||
final ProvenanceEventRecord event = new StandardProvenanceEventRecord.Builder()
|
||||
return new StandardProvenanceEventRecord.Builder()
|
||||
.setEventType(ProvenanceEventType.CONTENT_MODIFIED)
|
||||
.setAttributes(previousAttributes, updatedAttributes)
|
||||
.setComponentId(componentId)
|
||||
|
@ -647,7 +546,5 @@ public class TestLuceneEventIndex {
|
|||
.setLineageStartDate(timestamp)
|
||||
.setCurrentContentClaim("container", "section", "unit-test-id", 0L, 1024L)
|
||||
.build();
|
||||
|
||||
return event;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,9 +26,7 @@ import org.apache.nifi.provenance.RepositoryConfiguration;
|
|||
import org.apache.nifi.provenance.index.EventIndexSearcher;
|
||||
import org.apache.nifi.provenance.index.EventIndexWriter;
|
||||
import org.apache.nifi.util.file.FileUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
|
@ -36,20 +34,16 @@ import java.io.IOException;
|
|||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertSame;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
public class TestSimpleIndexManager {
|
||||
@BeforeClass
|
||||
public static void setLogLevel() {
|
||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeletingIndexWhileSearcherActive() throws IOException {
|
||||
final StandardIndexManager mgr = new StandardIndexManager(new RepositoryConfiguration());
|
||||
final File dir = new File("target/" + UUID.randomUUID().toString());
|
||||
final File dir = new File("target/" + UUID.randomUUID());
|
||||
try {
|
||||
final EventIndexWriter writer1 = mgr.borrowIndexWriter(dir);
|
||||
final Document doc1 = new Document();
|
||||
|
@ -73,12 +67,7 @@ public class TestSimpleIndexManager {
|
|||
FileUtils.deleteFile(dir, true);
|
||||
assertFalse(dir.exists());
|
||||
|
||||
try {
|
||||
mgr.borrowIndexSearcher(dir);
|
||||
Assert.fail("Expected FileNotFoundException to be thrown");
|
||||
} catch (final FileNotFoundException fnfe) {
|
||||
// expected
|
||||
}
|
||||
assertThrows(FileNotFoundException.class, () -> mgr.borrowIndexSearcher(dir));
|
||||
} finally {
|
||||
if (dir.exists()) {
|
||||
FileUtils.deleteFile(dir, true);
|
||||
|
@ -90,7 +79,7 @@ public class TestSimpleIndexManager {
|
|||
@Test
|
||||
public void testMultipleWritersSimultaneouslySameIndex() throws IOException {
|
||||
final StandardIndexManager mgr = new StandardIndexManager(new RepositoryConfiguration());
|
||||
final File dir = new File("target/" + UUID.randomUUID().toString());
|
||||
final File dir = new File("target/" + UUID.randomUUID());
|
||||
try {
|
||||
final EventIndexWriter writer1 = mgr.borrowIndexWriter(dir);
|
||||
final EventIndexWriter writer2 = mgr.borrowIndexWriter(dir);
|
||||
|
@ -121,16 +110,16 @@ public class TestSimpleIndexManager {
|
|||
|
||||
final StandardIndexManager mgr = new StandardIndexManager(new RepositoryConfiguration()) {
|
||||
@Override
|
||||
protected void close(IndexWriterCount count) throws IOException {
|
||||
protected void close(IndexWriterCount count) {
|
||||
closeCount.incrementAndGet();
|
||||
}
|
||||
};
|
||||
|
||||
final File dir = new File("target/" + UUID.randomUUID().toString());
|
||||
final File dir = new File("target/" + UUID.randomUUID());
|
||||
|
||||
final EventIndexWriter writer1 = mgr.borrowIndexWriter(dir);
|
||||
final EventIndexWriter writer2 = mgr.borrowIndexWriter(dir);
|
||||
assertTrue(writer1 == writer2);
|
||||
assertSame(writer1, writer2);
|
||||
|
||||
mgr.returnIndexWriter(writer1, true, true);
|
||||
assertEquals(0, closeCount.get());
|
||||
|
@ -138,11 +127,11 @@ public class TestSimpleIndexManager {
|
|||
final EventIndexWriter[] writers = new EventIndexWriter[10];
|
||||
for (int i = 0; i < writers.length; i++) {
|
||||
writers[i] = mgr.borrowIndexWriter(dir);
|
||||
assertTrue(writers[i] == writer1);
|
||||
assertSame(writers[i], writer1);
|
||||
}
|
||||
|
||||
for (int i = 0; i < writers.length; i++) {
|
||||
mgr.returnIndexWriter(writers[i], true, false);
|
||||
for (final EventIndexWriter writer : writers) {
|
||||
mgr.returnIndexWriter(writer, true, false);
|
||||
assertEquals(0, closeCount.get());
|
||||
assertEquals(1, mgr.getWriterCount());
|
||||
}
|
||||
|
@ -161,12 +150,12 @@ public class TestSimpleIndexManager {
|
|||
|
||||
final StandardIndexManager mgr = new StandardIndexManager(new RepositoryConfiguration()) {
|
||||
@Override
|
||||
protected void close(IndexWriterCount count) throws IOException {
|
||||
protected void close(IndexWriterCount count) {
|
||||
closeCount.incrementAndGet();
|
||||
}
|
||||
};
|
||||
|
||||
final File dir = new File("target/" + UUID.randomUUID().toString());
|
||||
final File dir = new File("target/" + UUID.randomUUID());
|
||||
|
||||
final EventIndexWriter writer = mgr.borrowIndexWriter(dir);
|
||||
mgr.returnIndexWriter(writer, true, true);
|
||||
|
@ -179,12 +168,12 @@ public class TestSimpleIndexManager {
|
|||
|
||||
final StandardIndexManager mgr = new StandardIndexManager(new RepositoryConfiguration()) {
|
||||
@Override
|
||||
protected void close(IndexWriterCount count) throws IOException {
|
||||
protected void close(IndexWriterCount count) {
|
||||
closeCount.incrementAndGet();
|
||||
}
|
||||
};
|
||||
|
||||
final File dir = new File("target/" + UUID.randomUUID().toString());
|
||||
final File dir = new File("target/" + UUID.randomUUID());
|
||||
|
||||
final EventIndexWriter writer = mgr.borrowIndexWriter(dir);
|
||||
mgr.returnIndexWriter(writer, true, false);
|
||||
|
|
|
@ -23,8 +23,6 @@ import org.apache.nifi.provenance.authorization.EventTransformer;
|
|||
import org.apache.nifi.provenance.index.EventIndex;
|
||||
import org.apache.nifi.provenance.serialization.StorageSummary;
|
||||
import org.apache.nifi.provenance.store.iterator.EventIterator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -36,8 +34,6 @@ import java.util.Optional;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class ArrayListEventStore implements EventStore {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ArrayListEventStore.class);
|
||||
|
||||
private final List<ProvenanceEventRecord> events = new ArrayList<>();
|
||||
private final AtomicLong idGenerator = new AtomicLong(0L);
|
||||
|
||||
|
@ -93,7 +89,7 @@ public class ArrayListEventStore implements EventStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized Optional<ProvenanceEventRecord> getEvent(long id) throws IOException {
|
||||
public synchronized Optional<ProvenanceEventRecord> getEvent(long id) {
|
||||
if (events.size() <= id) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
@ -129,7 +125,6 @@ public class ArrayListEventStore implements EventStore {
|
|||
try {
|
||||
eventOption = getEvent(eventId);
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Failed to retrieve event with ID " + eventId, e);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -141,9 +136,7 @@ public class ArrayListEventStore implements EventStore {
|
|||
events.add(eventOption.get());
|
||||
} else {
|
||||
final Optional<ProvenanceEventRecord> transformedOption = transformer.transform(eventOption.get());
|
||||
if (transformedOption.isPresent()) {
|
||||
events.add(transformedOption.get());
|
||||
}
|
||||
transformedOption.ifPresent(events::add);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,17 +17,19 @@
|
|||
|
||||
package org.apache.nifi.provenance.store;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
|
||||
@Timeout(value = 5)
|
||||
public class TestEventFileManager {
|
||||
|
||||
@Test(timeout = 5000)
|
||||
@Test
|
||||
public void testTwoWriteLocks() throws InterruptedException {
|
||||
final EventFileManager fileManager = new EventFileManager();
|
||||
final File f1 = new File("1.prov");
|
||||
|
@ -35,41 +37,35 @@ public class TestEventFileManager {
|
|||
|
||||
final AtomicBoolean obtained = new AtomicBoolean(false);
|
||||
|
||||
final Thread t1 = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
fileManager.obtainWriteLock(f1);
|
||||
final Thread t1 = new Thread(() -> {
|
||||
fileManager.obtainWriteLock(f1);
|
||||
|
||||
synchronized (obtained) {
|
||||
obtained.set(true);
|
||||
obtained.notify();
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(500L);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
fileManager.releaseWriteLock(f1);
|
||||
synchronized (obtained) {
|
||||
obtained.set(true);
|
||||
obtained.notify();
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(500L);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
fileManager.releaseWriteLock(f1);
|
||||
});
|
||||
|
||||
t1.start();
|
||||
|
||||
final Thread t2 = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (obtained) {
|
||||
while (!obtained.get()) {
|
||||
try {
|
||||
obtained.wait();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
final Thread t2 = new Thread(() -> {
|
||||
synchronized (obtained) {
|
||||
while (!obtained.get()) {
|
||||
try {
|
||||
obtained.wait();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
|
||||
fileManager.obtainWriteLock(gz);
|
||||
fileManager.releaseWriteLock(gz);
|
||||
}
|
||||
|
||||
fileManager.obtainWriteLock(gz);
|
||||
fileManager.releaseWriteLock(gz);
|
||||
});
|
||||
|
||||
final long start = System.nanoTime();
|
||||
|
@ -80,7 +76,7 @@ public class TestEventFileManager {
|
|||
}
|
||||
|
||||
|
||||
@Test(timeout = 5000)
|
||||
@Test
|
||||
public void testTwoReadLocks() throws InterruptedException {
|
||||
final EventFileManager fileManager = new EventFileManager();
|
||||
final File f1 = new File("1.prov");
|
||||
|
@ -88,41 +84,35 @@ public class TestEventFileManager {
|
|||
|
||||
final AtomicBoolean obtained = new AtomicBoolean(false);
|
||||
|
||||
final Thread t1 = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
fileManager.obtainReadLock(f1);
|
||||
final Thread t1 = new Thread(() -> {
|
||||
fileManager.obtainReadLock(f1);
|
||||
|
||||
synchronized (obtained) {
|
||||
obtained.set(true);
|
||||
obtained.notify();
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(100000L);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
fileManager.releaseReadLock(f1);
|
||||
synchronized (obtained) {
|
||||
obtained.set(true);
|
||||
obtained.notify();
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(100000L);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
fileManager.releaseReadLock(f1);
|
||||
});
|
||||
|
||||
t1.start();
|
||||
|
||||
final Thread t2 = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (obtained) {
|
||||
while (!obtained.get()) {
|
||||
try {
|
||||
obtained.wait();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
final Thread t2 = new Thread(() -> {
|
||||
synchronized (obtained) {
|
||||
while (!obtained.get()) {
|
||||
try {
|
||||
obtained.wait();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
|
||||
fileManager.obtainReadLock(gz);
|
||||
fileManager.releaseReadLock(gz);
|
||||
}
|
||||
|
||||
fileManager.obtainReadLock(gz);
|
||||
fileManager.releaseReadLock(gz);
|
||||
});
|
||||
|
||||
final long start = System.nanoTime();
|
||||
|
@ -133,7 +123,7 @@ public class TestEventFileManager {
|
|||
}
|
||||
|
||||
|
||||
@Test(timeout = 5000)
|
||||
@Test
|
||||
public void testWriteThenRead() throws InterruptedException {
|
||||
final EventFileManager fileManager = new EventFileManager();
|
||||
final File f1 = new File("1.prov");
|
||||
|
@ -141,41 +131,35 @@ public class TestEventFileManager {
|
|||
|
||||
final AtomicBoolean obtained = new AtomicBoolean(false);
|
||||
|
||||
final Thread t1 = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
fileManager.obtainWriteLock(f1);
|
||||
final Thread t1 = new Thread(() -> {
|
||||
fileManager.obtainWriteLock(f1);
|
||||
|
||||
synchronized (obtained) {
|
||||
obtained.set(true);
|
||||
obtained.notify();
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(500L);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
fileManager.releaseWriteLock(f1);
|
||||
synchronized (obtained) {
|
||||
obtained.set(true);
|
||||
obtained.notify();
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(500L);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
fileManager.releaseWriteLock(f1);
|
||||
});
|
||||
|
||||
t1.start();
|
||||
|
||||
final Thread t2 = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (obtained) {
|
||||
while (!obtained.get()) {
|
||||
try {
|
||||
obtained.wait();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
final Thread t2 = new Thread(() -> {
|
||||
synchronized (obtained) {
|
||||
while (!obtained.get()) {
|
||||
try {
|
||||
obtained.wait();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
|
||||
fileManager.obtainReadLock(gz);
|
||||
fileManager.releaseReadLock(gz);
|
||||
}
|
||||
|
||||
fileManager.obtainReadLock(gz);
|
||||
fileManager.releaseReadLock(gz);
|
||||
});
|
||||
|
||||
final long start = System.nanoTime();
|
||||
|
@ -186,7 +170,7 @@ public class TestEventFileManager {
|
|||
}
|
||||
|
||||
|
||||
@Test(timeout = 5000)
|
||||
@Test
|
||||
public void testReadThenWrite() throws InterruptedException {
|
||||
final EventFileManager fileManager = new EventFileManager();
|
||||
final File f1 = new File("1.prov");
|
||||
|
@ -194,41 +178,35 @@ public class TestEventFileManager {
|
|||
|
||||
final AtomicBoolean obtained = new AtomicBoolean(false);
|
||||
|
||||
final Thread t1 = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
fileManager.obtainReadLock(f1);
|
||||
final Thread t1 = new Thread(() -> {
|
||||
fileManager.obtainReadLock(f1);
|
||||
|
||||
synchronized (obtained) {
|
||||
obtained.set(true);
|
||||
obtained.notify();
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(500L);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
fileManager.releaseReadLock(f1);
|
||||
synchronized (obtained) {
|
||||
obtained.set(true);
|
||||
obtained.notify();
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(500L);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
fileManager.releaseReadLock(f1);
|
||||
});
|
||||
|
||||
t1.start();
|
||||
|
||||
final Thread t2 = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (obtained) {
|
||||
while (!obtained.get()) {
|
||||
try {
|
||||
obtained.wait();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
final Thread t2 = new Thread(() -> {
|
||||
synchronized (obtained) {
|
||||
while (!obtained.get()) {
|
||||
try {
|
||||
obtained.wait();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
|
||||
fileManager.obtainWriteLock(gz);
|
||||
fileManager.releaseWriteLock(gz);
|
||||
}
|
||||
|
||||
fileManager.obtainWriteLock(gz);
|
||||
fileManager.releaseWriteLock(gz);
|
||||
});
|
||||
|
||||
final long start = System.nanoTime();
|
||||
|
|
|
@ -34,11 +34,9 @@ import org.apache.nifi.provenance.store.iterator.EventIterator;
|
|||
import org.apache.nifi.provenance.toc.StandardTocWriter;
|
||||
import org.apache.nifi.provenance.toc.TocUtil;
|
||||
import org.apache.nifi.provenance.toc.TocWriter;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -54,10 +52,10 @@ import java.util.concurrent.Callable;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestPartitionedWriteAheadEventStore {
|
||||
private static final RecordWriterFactory writerFactory = (file, idGen, compress, createToc) -> RecordWriters.newSchemaRecordWriter(file, idGen, compress, createToc);
|
||||
|
@ -65,17 +63,13 @@ public class TestPartitionedWriteAheadEventStore {
|
|||
|
||||
private final AtomicLong idGenerator = new AtomicLong(0L);
|
||||
|
||||
@Rule
|
||||
public TestName testName = new TestName();
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void resetIds() {
|
||||
idGenerator.set(0L);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
@EnabledIfSystemProperty(named = "nifi.test.performance", matches = "true")
|
||||
public void testPerformanceOfAccessingEvents() throws Exception {
|
||||
final RecordWriterFactory recordWriterFactory = (file, idGenerator, compressed, createToc) -> {
|
||||
final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
|
||||
|
@ -465,8 +459,8 @@ public class TestPartitionedWriteAheadEventStore {
|
|||
|
||||
private RepositoryConfiguration createConfig(final int numStorageDirs) {
|
||||
final RepositoryConfiguration config = new RepositoryConfiguration();
|
||||
final String unitTestName = testName.getMethodName();
|
||||
final File storageDir = new File("target/storage/" + unitTestName + "/" + UUID.randomUUID().toString());
|
||||
final String unitTestName = getClass().getSimpleName();
|
||||
final File storageDir = new File("target/storage/" + unitTestName + "/" + UUID.randomUUID());
|
||||
|
||||
for (int i = 1; i <= numStorageDirs; i++) {
|
||||
config.addStorageDirectory(String.valueOf(i), new File(storageDir, String.valueOf(i)));
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.nifi.provenance.toc.StandardTocWriter;
|
|||
import org.apache.nifi.provenance.toc.TocUtil;
|
||||
import org.apache.nifi.provenance.toc.TocWriter;
|
||||
import org.apache.nifi.provenance.util.DirectoryUtils;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
@ -47,13 +47,12 @@ import java.util.concurrent.LinkedBlockingQueue;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestWriteAheadStorePartition {
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testReindex() throws IOException {
|
||||
final RepositoryConfiguration repoConfig = createConfig(1, "testReindex");
|
||||
repoConfig.setMaxEventFileCount(5);
|
||||
|
@ -79,7 +78,7 @@ public class TestWriteAheadStorePartition {
|
|||
final EventIndex eventIndex = Mockito.mock(EventIndex.class);
|
||||
Mockito.doAnswer(new Answer<Object>() {
|
||||
@Override
|
||||
public Object answer(final InvocationOnMock invocation) throws Throwable {
|
||||
public Object answer(final InvocationOnMock invocation) {
|
||||
final Map<ProvenanceEventRecord, StorageSummary> events = invocation.getArgument(0);
|
||||
reindexedEvents.putAll(events);
|
||||
return null;
|
||||
|
@ -142,7 +141,7 @@ public class TestWriteAheadStorePartition {
|
|||
|
||||
private RepositoryConfiguration createConfig(final int numStorageDirs, final String testName) {
|
||||
final RepositoryConfiguration config = new RepositoryConfiguration();
|
||||
final File storageDir = new File("target/storage/" + testName + "/" + UUID.randomUUID().toString());
|
||||
final File storageDir = new File("target/storage/" + testName + "/" + UUID.randomUUID());
|
||||
|
||||
for (int i = 1; i <= numStorageDirs; i++) {
|
||||
config.addStorageDirectory(String.valueOf(1), new File(storageDir, String.valueOf(i)));
|
||||
|
|
|
@ -17,8 +17,9 @@
|
|||
|
||||
package org.apache.nifi.provenance.store.iterator;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -42,14 +43,11 @@ import org.apache.nifi.provenance.store.RecordReaderFactory;
|
|||
import org.apache.nifi.provenance.toc.StandardTocWriter;
|
||||
import org.apache.nifi.provenance.toc.TocUtil;
|
||||
import org.apache.nifi.provenance.toc.TocWriter;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
|
||||
|
||||
public class TestSelectiveRecordReaderEventIterator {
|
||||
|
||||
|
||||
private RecordWriter createWriter(final File file, final TocWriter tocWriter, final boolean compressed, final int uncompressedBlockSize) throws IOException {
|
||||
return new EventIdFirstSchemaRecordWriter(file, new AtomicLong(0L), tocWriter, compressed, uncompressedBlockSize, IdentifierLookup.EMPTY);
|
||||
}
|
||||
|
@ -74,7 +72,7 @@ public class TestSelectiveRecordReaderEventIterator {
|
|||
eventIds.add(3048L);
|
||||
|
||||
List<File> filteredFiles = SelectiveRecordReaderEventIterator.filterUnneededFiles(files, eventIds);
|
||||
assertEquals(Arrays.asList(new File[] {file1000, file2000, file3000}), filteredFiles);
|
||||
assertEquals(Arrays.asList(file1000, file2000, file3000), filteredFiles);
|
||||
|
||||
// Filter out file at end
|
||||
eventIds.clear();
|
||||
|
@ -82,7 +80,7 @@ public class TestSelectiveRecordReaderEventIterator {
|
|||
eventIds.add(1048L);
|
||||
|
||||
filteredFiles = SelectiveRecordReaderEventIterator.filterUnneededFiles(files, eventIds);
|
||||
assertEquals(Arrays.asList(new File[] {file1, file1000}), filteredFiles);
|
||||
assertEquals(Arrays.asList(file1, file1000), filteredFiles);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -97,9 +95,7 @@ public class TestSelectiveRecordReaderEventIterator {
|
|||
eventIds.add(1L);
|
||||
eventIds.add(5L);
|
||||
|
||||
final RecordReaderFactory readerFactory = (file, logs, maxChars) -> {
|
||||
return RecordReaders.newRecordReader(file, logs, maxChars);
|
||||
};
|
||||
final RecordReaderFactory readerFactory = RecordReaders::newRecordReader;
|
||||
|
||||
final SelectiveRecordReaderEventIterator itr = new SelectiveRecordReaderEventIterator(files, readerFactory, eventIds, 65536);
|
||||
final Optional<ProvenanceEventRecord> firstRecordOption = itr.nextEvent();
|
||||
|
@ -107,9 +103,9 @@ public class TestSelectiveRecordReaderEventIterator {
|
|||
}
|
||||
|
||||
@Test
|
||||
@Ignore("For local testing only. Runs indefinitely")
|
||||
@EnabledIfSystemProperty(named = "nifi.test.performance", matches = "true")
|
||||
public void testPerformanceOfRandomAccessReads() throws Exception {
|
||||
final File dir = new File("target/storage/" + UUID.randomUUID().toString());
|
||||
final File dir = new File("target/storage/" + UUID.randomUUID());
|
||||
final File journalFile = new File(dir, "/4.prov.gz");
|
||||
final File tocFile = TocUtil.getTocFile(journalFile);
|
||||
|
||||
|
@ -126,7 +122,7 @@ public class TestSelectiveRecordReaderEventIterator {
|
|||
4L, 80L, 1024L, 1025L, 1026L, 1027L, 1028L, 1029L, 1030L, 40_000L, 80_000L, 99_000L
|
||||
};
|
||||
|
||||
final RecordReaderFactory readerFactory = (file, logs, maxChars) -> RecordReaders.newRecordReader(file, logs, maxChars);
|
||||
final RecordReaderFactory readerFactory = RecordReaders::newRecordReader;
|
||||
|
||||
final List<File> files = new ArrayList<>();
|
||||
files.add(new File(dir, "0.prov"));
|
||||
|
@ -145,9 +141,7 @@ public class TestSelectiveRecordReaderEventIterator {
|
|||
Collections.singletonList(journalFile), readerFactory, Arrays.asList(eventIds), 32 * 1024);
|
||||
|
||||
for (final long id : eventIds) {
|
||||
time(() -> {
|
||||
return iterator.nextEvent().orElse(null);
|
||||
}, id);
|
||||
time(() -> iterator.nextEvent().orElse(null), id);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -157,13 +151,8 @@ public class TestSelectiveRecordReaderEventIterator {
|
|||
}
|
||||
|
||||
private void time(final Callable<ProvenanceEventRecord> task, final long id) throws Exception {
|
||||
final long start = System.nanoTime();
|
||||
final ProvenanceEventRecord event = task.call();
|
||||
Assert.assertNotNull(event);
|
||||
Assert.assertEquals(id, event.getEventId());
|
||||
// System.out.println(event);
|
||||
final long nanos = System.nanoTime() - start;
|
||||
final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
|
||||
// System.out.println("Took " + millis + " ms to " + taskDescription);
|
||||
assertNotNull(event);
|
||||
assertEquals(id, event.getEventId());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,9 +16,9 @@
|
|||
*/
|
||||
package org.apache.nifi.provenance.toc;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
|
@ -27,13 +27,13 @@ import java.io.IOException;
|
|||
import java.io.OutputStream;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class TestStandardTocReader {
|
||||
|
||||
@Test
|
||||
public void testDetectsCompression() throws IOException {
|
||||
final File file = new File("target/" + UUID.randomUUID().toString());
|
||||
final File file = new File("target/" + UUID.randomUUID());
|
||||
try (final OutputStream out = new FileOutputStream(file)) {
|
||||
out.write(0);
|
||||
out.write(0);
|
||||
|
@ -65,7 +65,7 @@ public class TestStandardTocReader {
|
|||
|
||||
@Test
|
||||
public void testGetBlockIndexV1() throws IOException {
|
||||
final File file = new File("target/" + UUID.randomUUID().toString());
|
||||
final File file = new File("target/" + UUID.randomUUID());
|
||||
try (final OutputStream out = new FileOutputStream(file);
|
||||
final DataOutputStream dos = new DataOutputStream(out)) {
|
||||
out.write(1);
|
||||
|
@ -91,7 +91,7 @@ public class TestStandardTocReader {
|
|||
|
||||
@Test
|
||||
public void testGetBlockIndexV2() throws IOException {
|
||||
final File file = new File("target/" + UUID.randomUUID().toString());
|
||||
final File file = new File("target/" + UUID.randomUUID());
|
||||
try (final OutputStream out = new FileOutputStream(file);
|
||||
final DataOutputStream dos = new DataOutputStream(out)) {
|
||||
out.write(2);
|
||||
|
|
|
@ -16,21 +16,21 @@
|
|||
*/
|
||||
package org.apache.nifi.provenance.toc;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.nifi.util.file.FileUtils;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class TestStandardTocWriter {
|
||||
@Test
|
||||
public void testOverwriteEmptyFile() throws IOException {
|
||||
final File tocFile = new File("target/" + UUID.randomUUID().toString() + ".toc");
|
||||
final File tocFile = new File("target/" + UUID.randomUUID() + ".toc");
|
||||
try {
|
||||
assertTrue( tocFile.createNewFile() );
|
||||
assertTrue(tocFile.createNewFile());
|
||||
|
||||
try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
|
||||
}
|
||||
|
@ -38,5 +38,4 @@ public class TestStandardTocWriter {
|
|||
FileUtils.deleteFile(tocFile, false);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,8 +22,9 @@ import org.apache.nifi.provenance.search.Query;
|
|||
import org.apache.nifi.provenance.search.QuerySubmission;
|
||||
import org.apache.nifi.provenance.search.SearchTerms;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
@ -33,19 +34,20 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
@Timeout(value = 5)
|
||||
public class TestVolatileProvenanceRepository {
|
||||
|
||||
private VolatileProvenanceRepository repo;
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
public static void setup() {
|
||||
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestVolatileProvenanceRepository.class.getResource("/nifi.properties").getFile());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddAndGet() throws IOException, InterruptedException {
|
||||
public void testAddAndGet() throws IOException {
|
||||
repo = new VolatileProvenanceRepository(NiFiProperties.createBasicNiFiProperties(null));
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
|
@ -119,7 +121,7 @@ public class TestVolatileProvenanceRepository {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
@Test
|
||||
public void testSearchForInverseValue() throws InterruptedException {
|
||||
repo = new VolatileProvenanceRepository(NiFiProperties.createBasicNiFiProperties(null));
|
||||
|
||||
|
@ -239,17 +241,17 @@ public class TestVolatileProvenanceRepository {
|
|||
|
||||
@Override
|
||||
public Set<String> getGroups() {
|
||||
return Collections.EMPTY_SET;
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getIdentityProviderGroups() {
|
||||
return Collections.EMPTY_SET;
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getAllGroups() {
|
||||
return Collections.EMPTY_SET;
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue