NIFI-11946 Support preserving created timestamp for Registry Flows

This closes #7604

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
ravisingh 2023-08-13 19:11:16 -07:00 committed by exceptionfactory
parent af365414e9
commit 6bb2d62ffd
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 76 additions and 13 deletions

View File

@ -106,7 +106,7 @@ public class RegistryService {
this.registryUrlAliasService = Validate.notNull(registryUrlAliasService); this.registryUrlAliasService = Validate.notNull(registryUrlAliasService);
} }
private <T> void validate(T t, String invalidMessage) { private <T> void validate(T t, String invalidMessage) {
final Set<ConstraintViolation<T>> violations = validator.validate(t); final Set<ConstraintViolation<T>> violations = validator.validate(t);
if (violations.size() > 0) { if (violations.size() > 0) {
throw new ConstraintViolationException(invalidMessage, violations); throw new ConstraintViolationException(invalidMessage, violations);
@ -218,7 +218,7 @@ public class RegistryService {
final List<BucketEntity> bucketsWithSameName = metadataService.getBucketsByName(bucket.getName()); final List<BucketEntity> bucketsWithSameName = metadataService.getBucketsByName(bucket.getName());
if (bucketsWithSameName != null) { if (bucketsWithSameName != null) {
for (final BucketEntity bucketWithSameName : bucketsWithSameName) { for (final BucketEntity bucketWithSameName : bucketsWithSameName) {
if (!bucketWithSameName.getId().equals(existingBucketById.getId())){ if (!bucketWithSameName.getId().equals(existingBucketById.getId())) {
throw new IllegalStateException("A bucket with the same name already exists - " + bucket.getName()); throw new IllegalStateException("A bucket with the same name already exists - " + bucket.getName());
} }
} }
@ -339,11 +339,11 @@ public class RegistryService {
if (versionedFlow.getBucketIdentifier() == null) { if (versionedFlow.getBucketIdentifier() == null) {
versionedFlow.setBucketIdentifier(bucketIdentifier); versionedFlow.setBucketIdentifier(bucketIdentifier);
} }
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
versionedFlow.setCreatedTimestamp(timestamp); if (versionedFlow.getCreatedTimestamp() <= 0) {
versionedFlow.setCreatedTimestamp(timestamp);
}
versionedFlow.setModifiedTimestamp(timestamp); versionedFlow.setModifiedTimestamp(timestamp);
validate(versionedFlow, "Cannot create versioned flow"); validate(versionedFlow, "Cannot create versioned flow");
// ensure the bucket exists // ensure the bucket exists
@ -480,7 +480,7 @@ public class RegistryService {
final List<FlowEntity> flowsWithSameName = metadataService.getFlowsByName(existingBucket.getId(), versionedFlow.getName()); final List<FlowEntity> flowsWithSameName = metadataService.getFlowsByName(existingBucket.getId(), versionedFlow.getName());
if (flowsWithSameName != null) { if (flowsWithSameName != null) {
for (final FlowEntity flowWithSameName : flowsWithSameName) { for (final FlowEntity flowWithSameName : flowsWithSameName) {
if(!flowWithSameName.getId().equals(existingFlow.getId())) { if (!flowWithSameName.getId().equals(existingFlow.getId())) {
throw new IllegalStateException("A versioned flow with the same name already exists in the selected bucket"); throw new IllegalStateException("A versioned flow with the same name already exists in the selected bucket");
} }
} }
@ -729,7 +729,7 @@ public class RegistryService {
* Returns all versions of a flow, sorted newest to oldest. * Returns all versions of a flow, sorted newest to oldest.
* *
* @param bucketIdentifier the id of the bucket to search for the flowIdentifier * @param bucketIdentifier the id of the bucket to search for the flowIdentifier
* @param flowIdentifier the id of the flow to retrieve from the specified bucket * @param flowIdentifier the id of the flow to retrieve from the specified bucket
* @return all versions of the specified flow, sorted newest to oldest * @return all versions of the specified flow, sorted newest to oldest
*/ */
public SortedSet<VersionedFlowSnapshotMetadata> getFlowSnapshots(final String bucketIdentifier, final String flowIdentifier) { public SortedSet<VersionedFlowSnapshotMetadata> getFlowSnapshots(final String bucketIdentifier, final String flowIdentifier) {
@ -883,9 +883,9 @@ public class RegistryService {
* Returns the differences between two specified versions of a flow. * Returns the differences between two specified versions of a flow.
* *
* @param bucketIdentifier the id of the bucket the flow exists in * @param bucketIdentifier the id of the bucket the flow exists in
* @param flowIdentifier the flow to be examined * @param flowIdentifier the flow to be examined
* @param versionA the first version of the comparison * @param versionA the first version of the comparison
* @param versionB the second version of the comparison * @param versionB the second version of the comparison
* @return The differences between two specified versions, grouped by component. * @return The differences between two specified versions, grouped by component.
*/ */
public VersionedFlowDifference getFlowDiff(final String bucketIdentifier, final String flowIdentifier, public VersionedFlowDifference getFlowDiff(final String bucketIdentifier, final String flowIdentifier,
@ -949,6 +949,7 @@ public class RegistryService {
/** /**
* Group the differences in the comparison by component * Group the differences in the comparison by component
*
* @param flowDifferences The differences to group together by component * @param flowDifferences The differences to group together by component
* @return A set of componentDifferenceGroups where each entry contains a set of differences specific to that group * @return A set of componentDifferenceGroups where each entry contains a set of differences specific to that group
*/ */
@ -958,9 +959,9 @@ public class RegistryService {
ComponentDifferenceGroup group; ComponentDifferenceGroup group;
// A component may only exist on only one version for new/removed components // A component may only exist on only one version for new/removed components
VersionedComponent component = ObjectUtils.firstNonNull(diff.getComponentA(), diff.getComponentB()); VersionedComponent component = ObjectUtils.firstNonNull(diff.getComponentA(), diff.getComponentB());
if(differenceGroups.containsKey(component.getIdentifier())){ if (differenceGroups.containsKey(component.getIdentifier())) {
group = differenceGroups.get(component.getIdentifier()); group = differenceGroups.get(component.getIdentifier());
}else{ } else {
group = FlowMappings.map(component); group = FlowMappings.map(component);
differenceGroups.put(component.getIdentifier(), group); differenceGroups.put(component.getIdentifier(), group);
} }

View File

@ -57,6 +57,7 @@ import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -376,6 +377,67 @@ public class TestRegistryService {
assertEquals(versionedFlow.getDescription(), createdFlow.getDescription()); assertEquals(versionedFlow.getDescription(), createdFlow.getDescription());
} }
@Test
public void testCreateFlowWithCreatedTimestamp() {
final BucketEntity existingBucket = new BucketEntity();
existingBucket.setId("b1");
existingBucket.setName("My Bucket");
existingBucket.setDescription("This is my bucket");
existingBucket.setCreated(new Date());
when(metadataService.getBucketById(existingBucket.getId())).thenReturn(existingBucket);
final long timestamp = System.currentTimeMillis()-1000; // 1 millisecond previous
final VersionedFlow versionedFlow = new VersionedFlow();
versionedFlow.setIdentifier("f1");
versionedFlow.setName("My Flow");
versionedFlow.setBucketIdentifier("b1");
versionedFlow.setCreatedTimestamp(timestamp);
versionedFlow.setModifiedTimestamp(timestamp);
doAnswer(createFlowAnswer()).when(metadataService).createFlow(any(FlowEntity.class));
final VersionedFlow createdFlow = registryService.createFlow(versionedFlow.getBucketIdentifier(), versionedFlow);
assertNotNull(createdFlow);
assertNotNull(createdFlow.getIdentifier());
assertTrue(createdFlow.getCreatedTimestamp() > 0);
assertTrue(createdFlow.getModifiedTimestamp() > 0);
assertEquals(timestamp,createdFlow.getCreatedTimestamp());
assertNotEquals(timestamp,createdFlow.getModifiedTimestamp());
assertEquals(versionedFlow.getIdentifier(), createdFlow.getIdentifier());
assertEquals(versionedFlow.getName(), createdFlow.getName());
assertEquals(versionedFlow.getBucketIdentifier(), createdFlow.getBucketIdentifier());
assertEquals(versionedFlow.getDescription(), createdFlow.getDescription());
}
@Test
public void testCreateFlowWithPreserveSourcePropertiesTrueAndInvalidTimestamps() {
final BucketEntity existingBucket = new BucketEntity();
existingBucket.setId("b1");
existingBucket.setName("My Bucket");
existingBucket.setDescription("This is my bucket");
existingBucket.setCreated(new Date());
when(metadataService.getBucketById(existingBucket.getId())).thenReturn(existingBucket);
final long timestamp = -1;
final VersionedFlow versionedFlow = new VersionedFlow();
versionedFlow.setIdentifier("f1");
versionedFlow.setName("My Flow");
versionedFlow.setBucketIdentifier("b1");
versionedFlow.setCreatedTimestamp(timestamp);
versionedFlow.setModifiedTimestamp(timestamp);
doAnswer(createFlowAnswer()).when(metadataService).createFlow(any(FlowEntity.class));
final VersionedFlow createdFlow = registryService.createFlow(versionedFlow.getBucketIdentifier(), versionedFlow);
assertNotNull(createdFlow);
assertNotNull(createdFlow.getIdentifier());
assertTrue(createdFlow.getCreatedTimestamp() > 0);
assertTrue(createdFlow.getModifiedTimestamp() > 0);
assertEquals(versionedFlow.getIdentifier(), createdFlow.getIdentifier());
assertEquals(versionedFlow.getName(), createdFlow.getName());
assertEquals(versionedFlow.getBucketIdentifier(), createdFlow.getBucketIdentifier());
assertEquals(versionedFlow.getDescription(), createdFlow.getDescription());
}
@Test @Test
public void testGetFlowDoesNotExist() { public void testGetFlowDoesNotExist() {
when(metadataService.getFlowById(any(String.class))).thenReturn(null); when(metadataService.getFlowById(any(String.class))).thenReturn(null);
@ -385,7 +447,7 @@ public class TestRegistryService {
@Test @Test
public void testGetFlowDirectDoesNotExist() { public void testGetFlowDirectDoesNotExist() {
when(metadataService.getFlowById(any(String.class))).thenReturn(null); when(metadataService.getFlowById(any(String.class))).thenReturn(null);
assertThrows(ResourceNotFoundException.class , () -> registryService.getFlow("flow1")); assertThrows(ResourceNotFoundException.class, () -> registryService.getFlow("flow1"));
} }
@Test @Test