NIFI-13724 - Added additional metadata to events (#9239)

This commit is contained in:
Dye357 2024-10-21 12:05:48 -04:00 committed by GitHub
parent cc2cdddd49
commit 11ceb45783
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 142 additions and 32 deletions

View File

@ -28,58 +28,59 @@ import org.apache.nifi.registry.hook.EventFieldName;
import org.apache.nifi.registry.hook.EventType;
import org.apache.nifi.registry.security.authorization.user.NiFiUserUtils;
import java.util.Objects;
/**
* Factory to create Events from domain objects.
*/
public class EventFactory {
public static Event bucketCreated(final Bucket bucket) {
return new StandardEvent.Builder()
.eventType(EventType.CREATE_BUCKET)
.addField(EventFieldName.BUCKET_ID, bucket.getIdentifier())
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
return bucketEvent(bucket, EventType.CREATE_BUCKET);
}
public static Event bucketUpdated(final Bucket bucket) {
return new StandardEvent.Builder()
.eventType(EventType.UPDATE_BUCKET)
.addField(EventFieldName.BUCKET_ID, bucket.getIdentifier())
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
return bucketEvent(bucket, EventType.UPDATE_BUCKET);
}
public static Event bucketDeleted(final Bucket bucket) {
return bucketEvent(bucket, EventType.DELETE_BUCKET);
}
private static Event bucketEvent(final Bucket bucket, EventType eventType) {
return new StandardEvent.Builder()
.eventType(EventType.DELETE_BUCKET)
.eventType(eventType)
.addField(EventFieldName.BUCKET_ID, bucket.getIdentifier())
.addField(EventFieldName.BUCKET_NAME, bucket.getName())
.addField(EventFieldName.BUCKET_DESCRIPTION, Objects.requireNonNullElse(bucket.getDescription(), "")) //Empty string if Null
.addField(EventFieldName.CREATED_TIMESTAMP, String.valueOf(bucket.getCreatedTimestamp()))
.addField(EventFieldName.ALLOW_PUBLIC_READ, (bucket.isAllowPublicRead() == null) ? "" : String.valueOf(bucket.isAllowPublicRead())) //Empty string if Null
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
}
public static Event flowCreated(final VersionedFlow versionedFlow) {
return new StandardEvent.Builder()
.eventType(EventType.CREATE_FLOW)
.addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier())
.addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier())
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
return flowEvent(versionedFlow, EventType.CREATE_FLOW);
}
public static Event flowUpdated(final VersionedFlow versionedFlow) {
return new StandardEvent.Builder()
.eventType(EventType.UPDATE_FLOW)
.addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier())
.addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier())
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
return flowEvent(versionedFlow, EventType.UPDATE_FLOW);
}
public static Event flowDeleted(final VersionedFlow versionedFlow) {
return flowEvent(versionedFlow, EventType.DELETE_FLOW);
}
private static Event flowEvent(final VersionedFlow versionedFlow, EventType eventType) {
return new StandardEvent.Builder()
.eventType(EventType.DELETE_FLOW)
.eventType(eventType)
.addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier())
.addField(EventFieldName.BUCKET_NAME, Objects.requireNonNullElse(versionedFlow.getBucketName(), "")) //Empty string if Null
.addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier())
.addField(EventFieldName.FLOW_NAME, Objects.requireNonNullElse(versionedFlow.getName(), ""))
.addField(EventFieldName.FLOW_DESCRIPTION, Objects.requireNonNullElse(versionedFlow.getDescription(), ""))
.addField(EventFieldName.CREATED_TIMESTAMP, String.valueOf(versionedFlow.getCreatedTimestamp()))
.addField(EventFieldName.MODIFIED_TIMESTAMP, String.valueOf(versionedFlow.getModifiedTimestamp()))
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
}
@ -91,8 +92,11 @@ public class EventFactory {
return new StandardEvent.Builder()
.eventType(EventType.CREATE_FLOW_VERSION)
.addField(EventFieldName.BUCKET_ID, versionedFlowSnapshot.getSnapshotMetadata().getBucketIdentifier())
.addField(EventFieldName.BUCKET_NAME, (versionedFlowSnapshot.getBucket() == null) ? "" : Objects.requireNonNullElse(versionedFlowSnapshot.getBucket().getName(), ""))
.addField(EventFieldName.FLOW_ID, versionedFlowSnapshot.getSnapshotMetadata().getFlowIdentifier())
.addField(EventFieldName.FLOW_NAME, (versionedFlowSnapshot.getBucket() == null) ? "" : Objects.requireNonNullElse(versionedFlowSnapshot.getFlow().getName(), ""))
.addField(EventFieldName.VERSION, String.valueOf(versionedFlowSnapshot.getSnapshotMetadata().getVersion()))
.addField(EventFieldName.MODIFIED_TIMESTAMP, String.valueOf(versionedFlowSnapshot.getSnapshotMetadata().getTimestamp()))
.addField(EventFieldName.USER, versionedFlowSnapshot.getSnapshotMetadata().getAuthor())
.addField(EventFieldName.COMMENT, versionComments)
.build();

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.registry.event;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.authorization.User;
import org.apache.nifi.registry.authorization.UserGroup;
import org.apache.nifi.registry.bucket.Bucket;
@ -26,10 +27,10 @@ import org.apache.nifi.registry.extension.bundle.BundleVersionMetadata;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.hook.Event;
import org.apache.nifi.registry.hook.EventFieldName;
import org.apache.nifi.registry.hook.EventType;
import org.apache.nifi.registry.revision.entity.RevisionInfo;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -46,6 +47,7 @@ public class TestEventFactory {
private BundleVersion bundleVersion;
private User user;
private UserGroup userGroup;
private RevisionInfo revisionInfo;
@BeforeEach
public void setup() {
@ -53,12 +55,20 @@ public class TestEventFactory {
bucket.setName("Bucket1");
bucket.setIdentifier(UUID.randomUUID().toString());
bucket.setCreatedTimestamp(System.currentTimeMillis());
bucket.setDescription("Bucket 1 Description");
revisionInfo = new RevisionInfo();
revisionInfo.setVersion(1L);
versionedFlow = new VersionedFlow();
versionedFlow.setIdentifier(UUID.randomUUID().toString());
versionedFlow.setName("Flow 1");
versionedFlow.setDescription("Flow 1 Description");
versionedFlow.setBucketIdentifier(bucket.getIdentifier());
versionedFlow.setBucketName(bucket.getName());
versionedFlow.setCreatedTimestamp(System.currentTimeMillis());
versionedFlow.setModifiedTimestamp(System.currentTimeMillis());
versionedFlow.setRevision(revisionInfo);
VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
metadata.setAuthor("user1");
@ -70,6 +80,8 @@ public class TestEventFactory {
versionedFlowSnapshot = new VersionedFlowSnapshot();
versionedFlowSnapshot.setSnapshotMetadata(metadata);
versionedFlowSnapshot.setFlowContents(new VersionedProcessGroup());
versionedFlowSnapshot.setFlow(versionedFlow);
versionedFlowSnapshot.setBucket(bucket);
bundle = new Bundle();
bundle.setIdentifier(UUID.randomUUID().toString());
@ -105,19 +117,74 @@ public class TestEventFactory {
event.validate();
assertEquals(EventType.CREATE_BUCKET, event.getEventType());
assertEquals(2, event.getFields().size());
assertEquals(6, event.getFields().size());
assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals("unknown", event.getField(EventFieldName.USER).getValue());
}
@Test
public void testBucketCreatedEventWithNulls() {
Bucket bucket = new Bucket();
bucket.setName("test-bucket");
bucket.setIdentifier(UUID.randomUUID().toString());
final Event event = EventFactory.bucketCreated(bucket);
assertEquals(EventType.CREATE_BUCKET, event.getEventType());
assertEquals(6, event.getFields().size());
//Assert null values are empty Strings.
assertEquals( "", event.getField(EventFieldName.BUCKET_DESCRIPTION).getValue());
assertEquals("", event.getField(EventFieldName.ALLOW_PUBLIC_READ).getValue());
assertEquals( "0", event.getField(EventFieldName.CREATED_TIMESTAMP).getValue());
}
@Test
public void testFlowCreatedEventWithNulls() {
VersionedFlow versionedFlow = new VersionedFlow();
versionedFlow.setIdentifier(UUID.randomUUID().toString());
versionedFlow.setBucketIdentifier(UUID.randomUUID().toString());
versionedFlow.setBucketIdentifier(UUID.randomUUID().toString());
final Event event = EventFactory.flowCreated(versionedFlow);
assertEquals(EventType.CREATE_FLOW, event.getEventType());
assertEquals(8, event.getFields().size());
//Assert null values are empty Strings.
assertEquals( "", event.getField(EventFieldName.BUCKET_NAME).getValue());
assertEquals( "", event.getField(EventFieldName.FLOW_NAME).getValue());
assertEquals( "0", event.getField(EventFieldName.CREATED_TIMESTAMP).getValue());
assertEquals( "0", event.getField(EventFieldName.MODIFIED_TIMESTAMP).getValue());
}
@Test
public void testFlowMetaDataCreatedEventWithNulls() {
VersionedFlowSnapshot flowSnapshot = new VersionedFlowSnapshot();
VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
snapshotMetadata.setComments("");
snapshotMetadata.setFlowIdentifier(UUID.randomUUID().toString());
snapshotMetadata.setVersion(0);
snapshotMetadata.setBucketIdentifier(UUID.randomUUID().toString());
snapshotMetadata.setAuthor("");
flowSnapshot.setSnapshotMetadata(snapshotMetadata);
final Event event = EventFactory.flowVersionCreated(flowSnapshot);
assertEquals(EventType.CREATE_FLOW_VERSION, event.getEventType());
assertEquals(8, event.getFields().size());
//Assert null values are empty Strings.
assertEquals( "", event.getField(EventFieldName.BUCKET_NAME).getValue());
assertEquals( "", event.getField(EventFieldName.FLOW_NAME).getValue());
assertEquals( "0", event.getField(EventFieldName.MODIFIED_TIMESTAMP).getValue());
}
@Test
public void testBucketUpdatedEvent() {
final Event event = EventFactory.bucketUpdated(bucket);
event.validate();
assertEquals(EventType.UPDATE_BUCKET, event.getEventType());
assertEquals(2, event.getFields().size());
assertEquals(6, event.getFields().size());
assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals("unknown", event.getField(EventFieldName.USER).getValue());
@ -129,7 +196,7 @@ public class TestEventFactory {
event.validate();
assertEquals(EventType.DELETE_BUCKET, event.getEventType());
assertEquals(2, event.getFields().size());
assertEquals(6, event.getFields().size());
assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals("unknown", event.getField(EventFieldName.USER).getValue());
@ -141,7 +208,7 @@ public class TestEventFactory {
event.validate();
assertEquals(EventType.CREATE_FLOW, event.getEventType());
assertEquals(3, event.getFields().size());
assertEquals(8, event.getFields().size());
assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals(versionedFlow.getIdentifier(), event.getField(EventFieldName.FLOW_ID).getValue());
@ -154,7 +221,7 @@ public class TestEventFactory {
event.validate();
assertEquals(EventType.UPDATE_FLOW, event.getEventType());
assertEquals(3, event.getFields().size());
assertEquals(8, event.getFields().size());
assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals(versionedFlow.getIdentifier(), event.getField(EventFieldName.FLOW_ID).getValue());
@ -167,7 +234,7 @@ public class TestEventFactory {
event.validate();
assertEquals(EventType.DELETE_FLOW, event.getEventType());
assertEquals(3, event.getFields().size());
assertEquals(8, event.getFields().size());
assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals(versionedFlow.getIdentifier(), event.getField(EventFieldName.FLOW_ID).getValue());
@ -180,7 +247,7 @@ public class TestEventFactory {
event.validate();
assertEquals(EventType.CREATE_FLOW_VERSION, event.getEventType());
assertEquals(5, event.getFields().size());
assertEquals(8, event.getFields().size());
assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals(versionedFlow.getIdentifier(), event.getField(EventFieldName.FLOW_ID).getValue());

View File

@ -53,6 +53,8 @@ public class TestEventService {
@Test
public void testPublishConsume() throws InterruptedException {
final Bucket bucket = new Bucket();
bucket.setName("bucket1");
bucket.setDescription("bucketDescription");
bucket.setIdentifier(UUID.randomUUID().toString());
final Event bucketCreatedEvent = EventFactory.bucketCreated(bucket);

View File

@ -21,8 +21,15 @@ package org.apache.nifi.registry.hook;
*/
public enum EventFieldName {
ALLOW_PUBLIC_READ,
BUCKET_ID,
BUCKET_NAME,
BUCKET_DESCRIPTION,
FLOW_ID,
FLOW_NAME,
FLOW_DESCRIPTION,
CREATED_TIMESTAMP,
MODIFIED_TIMESTAMP,
EXTENSION_BUNDLE_ID,
VERSION,
USER,

View File

@ -30,15 +30,27 @@ public enum EventType {
CREATE_BUCKET(
EventFieldName.BUCKET_ID,
EventFieldName.BUCKET_NAME,
EventFieldName.BUCKET_DESCRIPTION,
EventFieldName.CREATED_TIMESTAMP,
EventFieldName.ALLOW_PUBLIC_READ,
EventFieldName.USER),
CREATE_FLOW(
EventFieldName.BUCKET_ID,
EventFieldName.BUCKET_NAME,
EventFieldName.FLOW_ID,
EventFieldName.FLOW_NAME,
EventFieldName.FLOW_DESCRIPTION,
EventFieldName.CREATED_TIMESTAMP,
EventFieldName.MODIFIED_TIMESTAMP,
EventFieldName.USER),
CREATE_FLOW_VERSION(
EventFieldName.BUCKET_ID,
EventFieldName.BUCKET_NAME,
EventFieldName.FLOW_ID,
EventFieldName.FLOW_NAME,
EventFieldName.VERSION,
EventFieldName.MODIFIED_TIMESTAMP,
EventFieldName.USER,
EventFieldName.COMMENT),
CREATE_EXTENSION_BUNDLE(
@ -55,17 +67,35 @@ public enum EventType {
REGISTRY_START(),
UPDATE_BUCKET(
EventFieldName.BUCKET_ID,
EventFieldName.BUCKET_NAME,
EventFieldName.BUCKET_DESCRIPTION,
EventFieldName.CREATED_TIMESTAMP,
EventFieldName.ALLOW_PUBLIC_READ,
EventFieldName.USER),
UPDATE_FLOW(
EventFieldName.BUCKET_ID,
EventFieldName.BUCKET_NAME,
EventFieldName.FLOW_ID,
EventFieldName.FLOW_NAME,
EventFieldName.FLOW_DESCRIPTION,
EventFieldName.CREATED_TIMESTAMP,
EventFieldName.MODIFIED_TIMESTAMP,
EventFieldName.USER),
DELETE_BUCKET(
EventFieldName.BUCKET_ID,
EventFieldName.BUCKET_NAME,
EventFieldName.BUCKET_DESCRIPTION,
EventFieldName.CREATED_TIMESTAMP,
EventFieldName.ALLOW_PUBLIC_READ,
EventFieldName.USER),
DELETE_FLOW(
EventFieldName.BUCKET_ID,
EventFieldName.BUCKET_NAME,
EventFieldName.FLOW_ID,
EventFieldName.FLOW_NAME,
EventFieldName.FLOW_DESCRIPTION,
EventFieldName.CREATED_TIMESTAMP,
EventFieldName.MODIFIED_TIMESTAMP,
EventFieldName.USER),
DELETE_EXTENSION_BUNDLE(
EventFieldName.BUCKET_ID,