mirror of https://github.com/apache/nifi.git
NIFI-9366 Create and remove provenance repository directory in test methods
- Updated nifi-persistent-provenance-repository tests This closes #5510 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
9e845d87ff
commit
f86fe0d61a
|
@ -18,7 +18,6 @@ package org.apache.nifi.provenance
|
||||||
|
|
||||||
import org.apache.nifi.events.EventReporter
|
import org.apache.nifi.events.EventReporter
|
||||||
import org.apache.nifi.flowfile.FlowFile
|
import org.apache.nifi.flowfile.FlowFile
|
||||||
import org.apache.nifi.provenance.serialization.RecordReaders
|
|
||||||
import org.apache.nifi.reporting.Severity
|
import org.apache.nifi.reporting.Severity
|
||||||
import org.apache.nifi.security.kms.StaticKeyProvider
|
import org.apache.nifi.security.kms.StaticKeyProvider
|
||||||
import org.apache.nifi.util.NiFiProperties
|
import org.apache.nifi.util.NiFiProperties
|
||||||
|
@ -34,6 +33,9 @@ import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
|
|
||||||
import static org.apache.nifi.provenance.TestUtil.createFlowFile
|
import static org.apache.nifi.provenance.TestUtil.createFlowFile
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat
|
||||||
|
import static org.hamcrest.CoreMatchers.is
|
||||||
|
import static org.hamcrest.CoreMatchers.hasItems
|
||||||
|
|
||||||
class EncryptedWriteAheadProvenanceRepositoryTest {
|
class EncryptedWriteAheadProvenanceRepositoryTest {
|
||||||
private static final String KEY_HEX_128 = "0123456789ABCDEFFEDCBA9876543210"
|
private static final String KEY_HEX_128 = "0123456789ABCDEFFEDCBA9876543210"
|
||||||
|
@ -48,6 +50,7 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
|
||||||
|
|
||||||
private ProvenanceRepository repo
|
private ProvenanceRepository repo
|
||||||
private static RepositoryConfiguration config
|
private static RepositoryConfiguration config
|
||||||
|
private File provenanceRepositoryDirectory
|
||||||
|
|
||||||
private EventReporter eventReporter
|
private EventReporter eventReporter
|
||||||
private List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>())
|
private List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>())
|
||||||
|
@ -59,6 +62,7 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setUp() throws Exception {
|
void setUp() throws Exception {
|
||||||
|
provenanceRepositoryDirectory = File.createTempDir(getClass().simpleName)
|
||||||
reportedEvents?.clear()
|
reportedEvents?.clear()
|
||||||
eventReporter = createMockEventReporter()
|
eventReporter = createMockEventReporter()
|
||||||
}
|
}
|
||||||
|
@ -66,11 +70,14 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
|
||||||
@AfterEach
|
@AfterEach
|
||||||
void tearDown() throws Exception {
|
void tearDown() throws Exception {
|
||||||
closeRepo(repo, config)
|
closeRepo(repo, config)
|
||||||
|
if (provenanceRepositoryDirectory != null & provenanceRepositoryDirectory.isDirectory()) {
|
||||||
|
provenanceRepositoryDirectory.deleteDir()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static RepositoryConfiguration createConfiguration() {
|
private static RepositoryConfiguration createConfiguration(final File provenanceDir) {
|
||||||
RepositoryConfiguration config = new RepositoryConfiguration()
|
final RepositoryConfiguration config = new RepositoryConfiguration()
|
||||||
config.addStorageDirectory("1", File.createTempDir(getClass().simpleName))
|
config.addStorageDirectory("1", provenanceDir)
|
||||||
config.setCompressOnRollover(true)
|
config.setCompressOnRollover(true)
|
||||||
config.setMaxEventFileLife(2000L, TimeUnit.SECONDS)
|
config.setMaxEventFileLife(2000L, TimeUnit.SECONDS)
|
||||||
config.setCompressionBlockBytes(100)
|
config.setCompressionBlockBytes(100)
|
||||||
|
@ -84,14 +91,15 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
|
||||||
}] as EventReporter
|
}] as EventReporter
|
||||||
}
|
}
|
||||||
|
|
||||||
private void closeRepo(ProvenanceRepository repo = this.repo, RepositoryConfiguration config = this.config) throws IOException {
|
private void closeRepo(final ProvenanceRepository repo = this.repo, final RepositoryConfiguration config = this.config) throws IOException {
|
||||||
if (repo == null) {
|
if (repo == null) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
repo.close()
|
repo.close()
|
||||||
} catch (IOException ioe) {
|
} catch (final IOException ignored) {
|
||||||
|
// intentionally blank
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete all of the storage files. We do this in order to clean up the tons of files that
|
// Delete all of the storage files. We do this in order to clean up the tons of files that
|
||||||
|
@ -99,8 +107,7 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
|
||||||
// streams open, for instance, this will throw an IOException, causing our unit test to fail.
|
// streams open, for instance, this will throw an IOException, causing our unit test to fail.
|
||||||
if (config != null) {
|
if (config != null) {
|
||||||
for (final File storageDir : config.getStorageDirectories().values()) {
|
for (final File storageDir : config.getStorageDirectories().values()) {
|
||||||
int i
|
for (int i = 0; i < 3; i++) {
|
||||||
for (i = 0; i < 3; i++) {
|
|
||||||
try {
|
try {
|
||||||
FileUtils.deleteFile(storageDir, true)
|
FileUtils.deleteFile(storageDir, true)
|
||||||
break
|
break
|
||||||
|
@ -122,7 +129,8 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(1000L)
|
Thread.sleep(1000L)
|
||||||
} catch (final InterruptedException ie) {
|
} catch (final InterruptedException ignored) {
|
||||||
|
// intentionally blank
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -133,16 +141,16 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static
|
private static final FlowFile buildFlowFile(final Map attributes = [:], final long id = recordId.getAndIncrement(), final long fileSize = 3000L) {
|
||||||
final FlowFile buildFlowFile(Map attributes = [:], long id = recordId.getAndIncrement(), long fileSize = 3000L) {
|
|
||||||
if (!attributes?.uuid) {
|
if (!attributes?.uuid) {
|
||||||
attributes.uuid = UUID.randomUUID().toString()
|
attributes.uuid = UUID.randomUUID().toString()
|
||||||
}
|
}
|
||||||
createFlowFile(id, fileSize, attributes)
|
createFlowFile(id, fileSize, attributes)
|
||||||
}
|
}
|
||||||
|
|
||||||
private
|
private static ProvenanceEventRecord buildEventRecord(final FlowFile flowfile = buildFlowFile(), final ProvenanceEventType eventType = ProvenanceEventType.RECEIVE,
|
||||||
static ProvenanceEventRecord buildEventRecord(FlowFile flowfile = buildFlowFile(), ProvenanceEventType eventType = ProvenanceEventType.RECEIVE, String transitUri = TRANSIT_URI, String componentId = COMPONENT_ID, String componentType = PROCESSOR_TYPE, long eventTime = System.currentTimeMillis()) {
|
final String transitUri = TRANSIT_URI, final String componentId = COMPONENT_ID,
|
||||||
|
final String componentType = PROCESSOR_TYPE, final long eventTime = System.currentTimeMillis()) {
|
||||||
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder()
|
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder()
|
||||||
builder.setEventTime(eventTime)
|
builder.setEventTime(eventTime)
|
||||||
builder.setEventType(eventType)
|
builder.setEventType(eventType)
|
||||||
|
@ -162,7 +170,7 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
|
||||||
@Test
|
@Test
|
||||||
void testWriteAheadProvenanceRepositoryShouldRegisterAndRetrieveEvents() throws IOException, InterruptedException {
|
void testWriteAheadProvenanceRepositoryShouldRegisterAndRetrieveEvents() throws IOException, InterruptedException {
|
||||||
// Arrange
|
// Arrange
|
||||||
config = createConfiguration()
|
config = createConfiguration(provenanceRepositoryDirectory)
|
||||||
// Needed until NIFI-3605 is implemented
|
// Needed until NIFI-3605 is implemented
|
||||||
// config.setMaxEventFileCapacity(1L)
|
// config.setMaxEventFileCapacity(1L)
|
||||||
config.setMaxEventFileCount(1)
|
config.setMaxEventFileCount(1)
|
||||||
|
@ -170,7 +178,7 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
|
||||||
repo = new WriteAheadProvenanceRepository(config)
|
repo = new WriteAheadProvenanceRepository(config)
|
||||||
repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
|
repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
|
||||||
|
|
||||||
Map attributes = ["abc": "xyz",
|
final Map attributes = ["abc": "xyz",
|
||||||
"123": "456"]
|
"123": "456"]
|
||||||
final ProvenanceEventRecord record = buildEventRecord(buildFlowFile(attributes))
|
final ProvenanceEventRecord record = buildEventRecord(buildFlowFile(attributes))
|
||||||
|
|
||||||
|
@ -187,31 +195,34 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
|
||||||
final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, RECORD_COUNT + 1)
|
final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, RECORD_COUNT + 1)
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
assert recoveredRecords.size() == RECORD_COUNT
|
assertThat(recoveredRecords.size(), is(RECORD_COUNT))
|
||||||
recoveredRecords.eachWithIndex { ProvenanceEventRecord recovered, int i ->
|
recoveredRecords.eachWithIndex { ProvenanceEventRecord recovered, int i ->
|
||||||
assert recovered.getEventId() == (i as Long)
|
assertThat(recovered.getEventId(), is(i as Long))
|
||||||
assert recovered.getTransitUri() == TRANSIT_URI
|
assertThat(recovered.getTransitUri(), is(TRANSIT_URI))
|
||||||
assert recovered.getEventType() == ProvenanceEventType.RECEIVE
|
assertThat(recovered.getEventType(), is(ProvenanceEventType.RECEIVE))
|
||||||
// The UUID was added later but we care that all attributes we provided are still there
|
// The UUID was added later but we care that all attributes we provided are still there
|
||||||
assert recovered.getAttributes().entrySet().containsAll(attributes.entrySet())
|
assertThat(recovered.getAttributes().entrySet(), hasItems(attributes.entrySet().toArray() as Map.Entry<String, String>[]))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testShouldRegisterAndGetEvents() {
|
void testEncryptedWriteAheadProvenanceRepositoryShouldRegisterAndGetEvents() {
|
||||||
// Arrange
|
// Arrange
|
||||||
final int RECORD_COUNT = 10
|
final int RECORD_COUNT = 10
|
||||||
|
|
||||||
NiFiProperties properties = NiFiProperties.createBasicNiFiProperties(null, [
|
// ensure NiFiProperties are converted to RepositoryConfig during encrypted repo constructor
|
||||||
|
final NiFiProperties properties = NiFiProperties.createBasicNiFiProperties(null, [
|
||||||
(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS): StaticKeyProvider.class.name,
|
(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_PROVIDER_IMPLEMENTATION_CLASS): StaticKeyProvider.class.name,
|
||||||
(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY): KEY_HEX,
|
(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY): KEY_HEX,
|
||||||
(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_ID): KEY_ID
|
(NiFiProperties.PROVENANCE_REPO_ENCRYPTION_KEY_ID): KEY_ID,
|
||||||
|
(NiFiProperties.PROVENANCE_REPO_DIRECTORY_PREFIX + "test"): provenanceRepositoryDirectory.toString()
|
||||||
])
|
])
|
||||||
|
|
||||||
repo = new EncryptedWriteAheadProvenanceRepository(properties)
|
repo = new EncryptedWriteAheadProvenanceRepository(properties)
|
||||||
|
config = repo.getConfig()
|
||||||
repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
|
repo.initialize(eventReporter, null, null, IdentifierLookup.EMPTY)
|
||||||
|
|
||||||
Map attributes = ["abc": "This is a plaintext attribute.",
|
final Map attributes = ["abc": "This is a plaintext attribute.",
|
||||||
"123": "This is another plaintext attribute."]
|
"123": "This is another plaintext attribute."]
|
||||||
final List<ProvenanceEventRecord> records = []
|
final List<ProvenanceEventRecord> records = []
|
||||||
RECORD_COUNT.times { int i ->
|
RECORD_COUNT.times { int i ->
|
||||||
|
@ -224,16 +235,16 @@ class EncryptedWriteAheadProvenanceRepositoryTest {
|
||||||
repo.registerEvents(records)
|
repo.registerEvents(records)
|
||||||
|
|
||||||
// Retrieve the events through the interface
|
// Retrieve the events through the interface
|
||||||
List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(LAST_RECORD_ID + 1, RECORD_COUNT * 2)
|
final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(LAST_RECORD_ID + 1, RECORD_COUNT * 2)
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
recoveredRecords.eachWithIndex { ProvenanceEventRecord recoveredRecord, int i ->
|
recoveredRecords.eachWithIndex { ProvenanceEventRecord recoveredRecord, int i ->
|
||||||
assert recoveredRecord.getEventId() == LAST_RECORD_ID + 1 + i
|
assertThat(recoveredRecord.getEventId(), is(LAST_RECORD_ID + 1 + i))
|
||||||
assert recoveredRecord.getTransitUri() == TRANSIT_URI
|
assertThat(recoveredRecord.getTransitUri(), is(TRANSIT_URI))
|
||||||
assert recoveredRecord.getEventType() == ProvenanceEventType.RECEIVE
|
assertThat(recoveredRecord.getEventType(), is(ProvenanceEventType.RECEIVE))
|
||||||
// The UUID was added later but we care that all attributes we provided are still there
|
// The UUID was added later but we care that all attributes we provided are still there
|
||||||
assert recoveredRecord.getAttributes().entrySet().containsAll(attributes.entrySet())
|
assertThat(recoveredRecord.getAttributes().entrySet(), hasItems((Map.Entry<String, String>[])attributes.entrySet().toArray()))
|
||||||
assert recoveredRecord.getAttribute("count") == i as String
|
assertThat(recoveredRecord.getAttribute("count"), is(i as String))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue