NIFI-12297 Standardized File Path resolution in Persistence Providers (#7975)

This commit is contained in:
exceptionfactory 2023-11-03 11:26:11 -05:00 committed by GitHub
parent 97dd543c6a
commit c706877147
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 305 additions and 226 deletions

View File

@ -39,7 +39,12 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* An {@link BundlePersistenceProvider} that uses local file-system for storage.
@ -53,6 +58,8 @@ public class FileSystemBundlePersistenceProvider implements BundlePersistencePro
static final String NAR_EXTENSION = ".nar";
static final String CPP_EXTENSION = ".cpp";
private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
private File bundleStorageDir;
@Override
@ -71,8 +78,7 @@ public class FileSystemBundlePersistenceProvider implements BundlePersistencePro
try {
bundleStorageDir = new File(bundleStorageDirValue);
FileUtils.ensureDirectoryExistAndCanReadAndWrite(bundleStorageDir);
LOGGER.info("Configured BundlePersistenceProvider with Extension Bundle Storage Directory {}",
new Object[] {bundleStorageDir.getAbsolutePath()});
LOGGER.info("Configured BundlePersistenceProvider with Extension Bundle Storage Directory {}", bundleStorageDir.getAbsolutePath());
} catch (IOException e) {
throw new ProviderCreationException(e);
}
@ -107,7 +113,7 @@ public class FileSystemBundlePersistenceProvider implements BundlePersistencePro
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Writing extension bundle to {}", new Object[]{bundleFile.getAbsolutePath()});
LOGGER.debug("Writing extension bundle to {}", bundleFile.getAbsolutePath());
}
try (final OutputStream out = new FileOutputStream(bundleFile)) {
@ -124,7 +130,7 @@ public class FileSystemBundlePersistenceProvider implements BundlePersistencePro
final File bundleFile = getBundleFile(versionCoordinate);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Reading extension bundle from {}", new Object[]{bundleFile.getAbsolutePath()});
LOGGER.debug("Reading extension bundle from {}", bundleFile.getAbsolutePath());
}
try (final InputStream in = new FileInputStream(bundleFile);
@ -142,7 +148,7 @@ public class FileSystemBundlePersistenceProvider implements BundlePersistencePro
public synchronized void deleteBundleVersion(final BundleVersionCoordinate versionCoordinate) throws BundlePersistenceException {
final File bundleFile = getBundleFile(versionCoordinate);
if (!bundleFile.exists()) {
LOGGER.warn("Extension bundle content does not exist at {}", new Object[] {bundleFile.getAbsolutePath()});
LOGGER.warn("Extension bundle content does not exist at {}", bundleFile.getAbsolutePath());
return;
}
@ -152,7 +158,7 @@ public class FileSystemBundlePersistenceProvider implements BundlePersistencePro
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Deleted extension bundle content at {}", new Object[] {bundleFile.getAbsolutePath()});
LOGGER.debug("Deleted extension bundle content at {}", bundleFile.getAbsolutePath());
}
}
@ -160,7 +166,7 @@ public class FileSystemBundlePersistenceProvider implements BundlePersistencePro
public synchronized void deleteAllBundleVersions(final BundleCoordinate bundleCoordinate) throws BundlePersistenceException {
final File bundleDir = getBundleDirectory(bundleStorageDir, bundleCoordinate);
if (!bundleDir.exists()) {
LOGGER.warn("Extension bundle directory does not exist at {}", new Object[] {bundleDir.getAbsolutePath()});
LOGGER.warn("Extension bundle directory does not exist at {}", bundleDir.getAbsolutePath());
return;
}
@ -207,7 +213,8 @@ public class FileSystemBundlePersistenceProvider implements BundlePersistencePro
final String groupId = bundleCoordinate.getGroupId();
final String artifactId = bundleCoordinate.getArtifactId();
return new File(bundleStorageDir, sanitize(bucketId) + "/" + sanitize(groupId) + "/" + sanitize(artifactId));
final Path artifactPath = getArtifactPath(bucketId, groupId, artifactId);
return getChildLocation(bundleStorageDir, artifactPath);
}
static File getBundleVersionDirectory(final File bundleStorageDir, final BundleVersionCoordinate versionCoordinate) {
@ -216,7 +223,9 @@ public class FileSystemBundlePersistenceProvider implements BundlePersistencePro
final String artifactId = versionCoordinate.getArtifactId();
final String version = versionCoordinate.getVersion();
return new File(bundleStorageDir, sanitize(bucketId) + "/" + sanitize(groupId) + "/" + sanitize(artifactId) + "/" + sanitize(version));
final Path artifactPath = getArtifactPath(bucketId, groupId, artifactId);
final Path versionPath = Paths.get(sanitize(version)).normalize();
return getChildLocation(bundleStorageDir, artifactPath.resolve(versionPath));
}
static File getBundleFile(final File parentDir, final BundleVersionCoordinate versionCoordinate) {
@ -227,7 +236,11 @@ public class FileSystemBundlePersistenceProvider implements BundlePersistencePro
final String bundleFileExtension = getBundleFileExtension(bundleType);
final String bundleFilename = sanitize(artifactId) + "-" + sanitize(version) + bundleFileExtension;
return new File(parentDir, bundleFilename);
return getChildLocation(parentDir, Paths.get(bundleFilename));
}
static Path getArtifactPath(final String bucketId, final String groupId, final String artifactId) {
return Paths.get(getNormalizedBucketId(bucketId), sanitize(groupId), sanitize(artifactId)).normalize();
}
static String sanitize(final String input) {
@ -246,4 +259,24 @@ public class FileSystemBundlePersistenceProvider implements BundlePersistencePro
}
}
private static String getNormalizedBucketId(final String id) {
final String sanitized = FileUtils.sanitizeFilename(id).trim().toLowerCase();
final Matcher matcher = NUMBER_PATTERN.matcher(sanitized);
if (matcher.matches()) {
final int normalized = Integer.parseInt(sanitized);
return Integer.toString(normalized);
} else {
final UUID normalized = UUID.fromString(id);
return normalized.toString();
}
}
private static File getChildLocation(final File parentDir, final Path childLocation) {
final Path parentPath = parentDir.toPath().normalize();
final Path childPath = parentPath.resolve(childLocation.normalize());
if (childPath.startsWith(parentPath)) {
return childPath.toFile();
}
throw new IllegalArgumentException(String.format("Child location not valid [%s]", childLocation));
}
}

View File

@ -33,7 +33,12 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* A FlowPersistenceProvider that uses the local filesystem for storage.
@ -46,6 +51,8 @@ public class FileSystemFlowPersistenceProvider implements FlowPersistenceProvide
static final String SNAPSHOT_EXTENSION = ".snapshot";
private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
private File flowStorageDir;
@Override
@ -63,7 +70,7 @@ public class FileSystemFlowPersistenceProvider implements FlowPersistenceProvide
try {
flowStorageDir = new File(flowStorageDirValue);
FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowStorageDir);
LOGGER.info("Configured FileSystemFlowPersistenceProvider with Flow Storage Directory {}", new Object[] {flowStorageDir.getAbsolutePath()});
LOGGER.info("Configured FileSystemFlowPersistenceProvider with Flow Storage Directory {}", flowStorageDir.getAbsolutePath());
} catch (IOException e) {
throw new ProviderCreationException(e);
}
@ -71,14 +78,14 @@ public class FileSystemFlowPersistenceProvider implements FlowPersistenceProvide
@Override
public synchronized void saveFlowContent(final FlowSnapshotContext context, final byte[] content) throws FlowPersistenceException {
final File bucketDir = new File(flowStorageDir, context.getBucketId());
final File bucketDir = getChildLocation(flowStorageDir, getNormalizedIdPath(context.getBucketId()));
try {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(bucketDir);
} catch (IOException e) {
throw new FlowPersistenceException("Error accessing bucket directory at " + bucketDir.getAbsolutePath(), e);
}
final File flowDir = new File(bucketDir, context.getFlowId());
final File flowDir = getChildLocation(bucketDir, getNormalizedIdPath(context.getFlowId()));
try {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowDir);
} catch (IOException e) {
@ -86,27 +93,28 @@ public class FileSystemFlowPersistenceProvider implements FlowPersistenceProvide
}
final String versionString = String.valueOf(context.getVersion());
final File versionDir = new File(flowDir, versionString);
final File versionDir = getChildLocation(flowDir, Paths.get(versionString));
try {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(versionDir);
} catch (IOException e) {
throw new FlowPersistenceException("Error accessing version directory at " + versionDir.getAbsolutePath(), e);
}
final File versionFile = new File(versionDir, versionString + SNAPSHOT_EXTENSION);
final String versionExtension = versionString + SNAPSHOT_EXTENSION;
final File versionFile = getChildLocation(versionDir, Paths.get(versionExtension));
if (versionFile.exists()) {
throw new FlowPersistenceException("Unable to save, a snapshot already exists with version " + versionString);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Saving snapshot with filename {}", new Object[] {versionFile.getAbsolutePath()});
LOGGER.debug("Saving snapshot with filename {}", versionFile.getAbsolutePath());
}
try (final OutputStream out = new FileOutputStream(versionFile)) {
out.write(content);
out.flush();
} catch (Exception e) {
throw new FlowPersistenceException("Unable to write snapshot to disk due to " + e.getMessage(), e);
throw new FlowPersistenceException("Unable to write snapshot to disk", e);
}
}
@ -114,7 +122,7 @@ public class FileSystemFlowPersistenceProvider implements FlowPersistenceProvide
public synchronized byte[] getFlowContent(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
final File snapshotFile = getSnapshotFile(bucketId, flowId, version);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Retrieving snapshot with filename {}", new Object[] {snapshotFile.getAbsolutePath()});
LOGGER.debug("Retrieving snapshot with filename {}", snapshotFile.getAbsolutePath());
}
if (!snapshotFile.exists()) {
@ -130,9 +138,12 @@ public class FileSystemFlowPersistenceProvider implements FlowPersistenceProvide
@Override
public synchronized void deleteAllFlowContent(final String bucketId, final String flowId) throws FlowPersistenceException {
final File flowDir = new File(flowStorageDir, bucketId + "/" + flowId);
final Path bucketIdPath = getNormalizedIdPath(bucketId);
final Path flowIdPath = getNormalizedIdPath(flowId);
final Path bucketFlowPath = bucketIdPath.resolve(flowIdPath);
final File flowDir = getChildLocation(flowStorageDir, bucketFlowPath);
if (!flowDir.exists()) {
LOGGER.debug("Snapshot directory does not exist at {}", new Object[] {flowDir.getAbsolutePath()});
LOGGER.debug("Snapshot directory does not exist at {}", flowDir.getAbsolutePath());
return;
}
@ -150,12 +161,12 @@ public class FileSystemFlowPersistenceProvider implements FlowPersistenceProvide
}
// delete the directory for the bucket if there is nothing left
final File bucketDir = new File(flowStorageDir, bucketId);
final File bucketDir = getChildLocation(flowStorageDir, getNormalizedIdPath(bucketId));
final File[] bucketFiles = bucketDir.listFiles();
if (bucketFiles.length == 0) {
if (bucketFiles == null || bucketFiles.length == 0) {
final boolean deletedBucket = bucketDir.delete();
if (!deletedBucket) {
LOGGER.error("Unable to delete bucket directory: " + flowDir.getAbsolutePath());
LOGGER.error("Unable to delete bucket directory: {}", flowDir.getAbsolutePath());
}
}
}
@ -164,7 +175,7 @@ public class FileSystemFlowPersistenceProvider implements FlowPersistenceProvide
public synchronized void deleteFlowContent(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
final File snapshotFile = getSnapshotFile(bucketId, flowId, version);
if (!snapshotFile.exists()) {
LOGGER.debug("Snapshot file does not exist at {}", new Object[] {snapshotFile.getAbsolutePath()});
LOGGER.debug("Snapshot file does not exist at {}", snapshotFile.getAbsolutePath());
return;
}
@ -174,13 +185,40 @@ public class FileSystemFlowPersistenceProvider implements FlowPersistenceProvide
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Deleted snapshot at {}", new Object[] {snapshotFile.getAbsolutePath()});
LOGGER.debug("Deleted snapshot at {}", snapshotFile.getAbsolutePath());
}
}
protected File getSnapshotFile(final String bucketId, final String flowId, final int version) {
final String snapshotFilename = bucketId + "/" + flowId + "/" + version + "/" + version + SNAPSHOT_EXTENSION;
return new File(flowStorageDir, snapshotFilename);
final String versionExtension = version + SNAPSHOT_EXTENSION;
final Path snapshotLocation = Paths.get(getNormalizedId(bucketId), getNormalizedId(flowId), Integer.toString(version), versionExtension);
return getChildLocation(flowStorageDir, snapshotLocation);
}
private File getChildLocation(final File parentDir, final Path childLocation) {
final Path parentPath = parentDir.toPath().normalize();
final Path childPathNormalized = childLocation.normalize();
final Path childPath = parentPath.resolve(childPathNormalized);
if (childPath.startsWith(parentPath)) {
return childPath.toFile();
}
throw new IllegalArgumentException(String.format("Child location not valid [%s]", childLocation));
}
private Path getNormalizedIdPath(final String id) {
final String normalizedId = getNormalizedId(id);
return Paths.get(normalizedId).normalize();
}
private String getNormalizedId(final String input) {
final String sanitized = FileUtils.sanitizeFilename(input).trim().toLowerCase();
final Matcher matcher = NUMBER_PATTERN.matcher(sanitized);
if (matcher.matches()) {
final int normalized = Integer.parseInt(sanitized);
return Integer.toString(normalized);
} else {
final UUID normalized = UUID.fromString(input);
return normalized.toString();
}
}
}

View File

@ -43,20 +43,28 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.when;
public class TestFileSystemBundlePersistenceProvider {
static final String EXTENSION_STORAGE_DIR = "target/extension_storage";
static final ProviderConfigurationContext CONFIGURATION_CONTEXT = new ProviderConfigurationContext() {
@Override
public Map<String, String> getProperties() {
final Map<String,String> props = new HashMap<>();
props.put(FileSystemBundlePersistenceProvider.BUNDLE_STORAGE_DIR_PROP, EXTENSION_STORAGE_DIR);
return props;
}
private static final String BUCKET_ID = "b0000000-0000-0000-0000-000000000000";
private static final String SECOND_BUCKET_ID = "b2000000-0000-0000-0000-000000000000";
private static final String GROUP_ID = "c0000000-0000-0000-0000-000000000000";
private static final String ARTIFACT_ID = "a0000000-0000-0000-0000-000000000000";
private static final String FIRST_VERSION = "1.0.0";
private static final String SECOND_VERSION = "1.1.0";
static final ProviderConfigurationContext CONFIGURATION_CONTEXT = () -> {
final Map<String,String> props = new HashMap<>();
props.put(FileSystemBundlePersistenceProvider.BUNDLE_STORAGE_DIR_PROP, EXTENSION_STORAGE_DIR);
return props;
};
private File bundleStorageDir;
@ -82,20 +90,20 @@ public class TestFileSystemBundlePersistenceProvider {
final BundleVersionType type = BundleVersionType.NIFI_NAR;
// first version in b1
final String content1 = "g1-a1-1.0.0";
final BundleVersionCoordinate versionCoordinate1 = getVersionCoordinate("b1", "g1", "a1", "1.0.0", type);
final String content1 = String.format("%s-%s-%s", GROUP_ID, ARTIFACT_ID, FIRST_VERSION);
final BundleVersionCoordinate versionCoordinate1 = getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, FIRST_VERSION, type);
createBundleVersion(fileSystemBundleProvider, versionCoordinate1 , content1);
verifyBundleVersion(bundleStorageDir, versionCoordinate1, content1);
// second version in b1
final String content2 = "g1-a1-1.1.0";
final BundleVersionCoordinate versionCoordinate2 = getVersionCoordinate("b1", "g1", "a1", "1.1.0", type);
final String content2 = String.format("%s-%s-%s", GROUP_ID, ARTIFACT_ID, SECOND_VERSION);
final BundleVersionCoordinate versionCoordinate2 = getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, SECOND_VERSION, type);
createBundleVersion(fileSystemBundleProvider, versionCoordinate2, content2);
verifyBundleVersion(bundleStorageDir, versionCoordinate2, content2);
// same bundle but in b2
final String content3 = "g1-a1-1.1.0";
final BundleVersionCoordinate versionCoordinate3 = getVersionCoordinate("b2", "g1", "a1", "1.1.0", type);
final String content3 = String.format("%s-%s-%s", GROUP_ID, ARTIFACT_ID, SECOND_VERSION);
final BundleVersionCoordinate versionCoordinate3 = getVersionCoordinate(SECOND_BUCKET_ID, GROUP_ID, ARTIFACT_ID, SECOND_VERSION, type);
createBundleVersion(fileSystemBundleProvider, versionCoordinate3, content3);
verifyBundleVersion(bundleStorageDir, versionCoordinate3, content2);
}
@ -104,19 +112,14 @@ public class TestFileSystemBundlePersistenceProvider {
public void testCreateWhenBundleVersionAlreadyExists() throws IOException {
final BundleVersionType type = BundleVersionType.NIFI_NAR;
final String content1 = "g1-a1-1.0.0";
final BundleVersionCoordinate versionCoordinate = getVersionCoordinate("b1", "g1", "a1", "1.0.0", type);
final String content1 = String.format("%s-%s-%s", GROUP_ID, ARTIFACT_ID, FIRST_VERSION);
final BundleVersionCoordinate versionCoordinate = getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, FIRST_VERSION, type);
createBundleVersion(fileSystemBundleProvider, versionCoordinate, content1);
verifyBundleVersion(bundleStorageDir, versionCoordinate, content1);
// try to save same bundle version that already exists
try {
final String newContent = "new content";
createBundleVersion(fileSystemBundleProvider, versionCoordinate, newContent);
fail("Should have thrown exception");
} catch (BundlePersistenceException e) {
// expected
}
final String newContent = "new content";
assertThrows(BundlePersistenceException.class, () -> createBundleVersion(fileSystemBundleProvider, versionCoordinate, newContent));
// verify existing content wasn't modified
verifyBundleVersion(bundleStorageDir, versionCoordinate, content1);
@ -126,8 +129,8 @@ public class TestFileSystemBundlePersistenceProvider {
public void testUpdateWhenBundleVersionAlreadyExists() throws IOException {
final BundleVersionType type = BundleVersionType.NIFI_NAR;
final String content1 = "g1-a1-1.0.0";
final BundleVersionCoordinate versionCoordinate = getVersionCoordinate("b1", "g1", "a1", "1.0.0", type);
final String content1 = String.format("%s-%s-%s", GROUP_ID, ARTIFACT_ID, FIRST_VERSION);
final BundleVersionCoordinate versionCoordinate = getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, FIRST_VERSION, type);
createBundleVersion(fileSystemBundleProvider, versionCoordinate, content1);
verifyBundleVersion(bundleStorageDir, versionCoordinate, content1);
@ -146,17 +149,14 @@ public class TestFileSystemBundlePersistenceProvider {
@Test
public void testCreateAndGet() throws IOException {
final String bucketId = "b1";
final String groupId = "g1";
final String artifactId = "a1";
final BundleVersionType type = BundleVersionType.NIFI_NAR;
final String content1 = groupId + "-" + artifactId + "-" + "1.0.0";
final BundleVersionCoordinate versionCoordinate1 = getVersionCoordinate(bucketId, groupId, artifactId, "1.0.0", type);
final String content1 = String.format("%s-%s-%s", GROUP_ID, ARTIFACT_ID, FIRST_VERSION);
final BundleVersionCoordinate versionCoordinate1 = getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, FIRST_VERSION, type);
createBundleVersion(fileSystemBundleProvider,versionCoordinate1, content1);
final String content2 = groupId + "-" + artifactId + "-" + "1.1.0";
final BundleVersionCoordinate versionCoordinate2 = getVersionCoordinate(bucketId, groupId, artifactId, "1.1.0", type);
final String content2 = String.format("%s-%s-%s", GROUP_ID, ARTIFACT_ID, SECOND_VERSION);
final BundleVersionCoordinate versionCoordinate2 = getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, SECOND_VERSION, type);
createBundleVersion(fileSystemBundleProvider, versionCoordinate2, content2);
try (final OutputStream out = new ByteArrayOutputStream()) {
@ -176,30 +176,22 @@ public class TestFileSystemBundlePersistenceProvider {
@Test
public void testGetWhenDoesNotExist() throws IOException {
final String bucketId = "b1";
final String groupId = "g1";
final String artifactId = "a1";
final String version = "1.0.0";
final BundleVersionType type = BundleVersionType.NIFI_NAR;
try (final OutputStream out = new ByteArrayOutputStream()) {
final BundleVersionCoordinate versionCoordinate = getVersionCoordinate(bucketId, groupId, artifactId, version, type);
final BundleVersionCoordinate versionCoordinate = getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, FIRST_VERSION, type);
assertThrows(BundlePersistenceException.class, () -> fileSystemBundleProvider.getBundleVersionContent(versionCoordinate, out));
}
}
@Test
public void testDeleteExtensionBundleVersion() throws IOException {
final String bucketId = "b1";
final String groupId = "g1";
final String artifactId = "a1";
final String version = "1.0.0";
final BundleVersionType bundleType = BundleVersionType.NIFI_NAR;
final BundleVersionCoordinate versionCoordinate = getVersionCoordinate(bucketId, groupId, artifactId, version, bundleType);
final BundleVersionCoordinate versionCoordinate = getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, FIRST_VERSION, bundleType);
// create and verify the bundle version
final String content1 = groupId + "-" + artifactId + "-" + version;
final String content1 = String.format("%s-%s-%s", GROUP_ID, ARTIFACT_ID, FIRST_VERSION);
createBundleVersion(fileSystemBundleProvider, versionCoordinate, content1);
verifyBundleVersion(bundleStorageDir, versionCoordinate, content1);
@ -213,14 +205,10 @@ public class TestFileSystemBundlePersistenceProvider {
}
@Test
public void testDeleteExtensionBundleVersionWhenDoesNotExist() throws IOException {
final String bucketId = "b1";
final String groupId = "g1";
final String artifactId = "a1";
final String version = "1.0.0";
public void testDeleteExtensionBundleVersionWhenDoesNotExist() {
final BundleVersionType bundleType = BundleVersionType.NIFI_NAR;
final BundleVersionCoordinate versionCoordinate = getVersionCoordinate(bucketId, groupId, artifactId, version, bundleType);
final BundleVersionCoordinate versionCoordinate = getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, FIRST_VERSION, bundleType);
// verify the bundle version does not already exist
final File bundleVersionDir = FileSystemBundlePersistenceProvider.getBundleVersionDirectory(bundleStorageDir, versionCoordinate);
@ -233,39 +221,30 @@ public class TestFileSystemBundlePersistenceProvider {
@Test
public void testDeleteAllBundleVersions() throws IOException {
final String bucketId = "b1";
final String groupId = "g1";
final String artifactId = "a1";
final String version1 = "1.0.0";
final String version2 = "2.0.0";
final BundleVersionType bundleType = BundleVersionType.NIFI_NAR;
// create and verify the bundle version 1
final String content1 = groupId + "-" + artifactId + "-" + version1;
final BundleVersionCoordinate versionCoordinate1 = getVersionCoordinate(bucketId, groupId, artifactId, version1, bundleType);
final String content1 = String.format("%s-%s-%s", GROUP_ID, ARTIFACT_ID, FIRST_VERSION);
final BundleVersionCoordinate versionCoordinate1 = getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, FIRST_VERSION, bundleType);
createBundleVersion(fileSystemBundleProvider, versionCoordinate1, content1);
verifyBundleVersion(bundleStorageDir, versionCoordinate1, content1);
// create and verify the bundle version 2
final String content2 = groupId + "-" + artifactId + "-" + version2;
final BundleVersionCoordinate versionCoordinate2 = getVersionCoordinate(bucketId, groupId, artifactId, version2, bundleType);
final String content2 = String.format("%s-%s-%s", GROUP_ID, ARTIFACT_ID, SECOND_VERSION);
final BundleVersionCoordinate versionCoordinate2 = getVersionCoordinate(BUCKET_ID, GROUP_ID, ARTIFACT_ID, SECOND_VERSION, bundleType);
createBundleVersion(fileSystemBundleProvider, versionCoordinate2, content2);
verifyBundleVersion(bundleStorageDir, versionCoordinate2, content2);
assertEquals(1, bundleStorageDir.listFiles().length);
final BundleCoordinate bundleCoordinate = getBundleCoordinate(bucketId, groupId, artifactId);
final BundleCoordinate bundleCoordinate = getBundleCoordinate();
fileSystemBundleProvider.deleteAllBundleVersions(bundleCoordinate);
assertEquals(0, bundleStorageDir.listFiles().length);
}
@Test
public void testDeleteAllBundleVersionsWhenDoesNotExist() throws IOException {
final String bucketId = "b1";
final String groupId = "g1";
final String artifactId = "a1";
public void testDeleteAllBundleVersionsWhenDoesNotExist() {
assertEquals(0, bundleStorageDir.listFiles().length);
final BundleCoordinate bundleCoordinate = getBundleCoordinate(bucketId, groupId, artifactId);
final BundleCoordinate bundleCoordinate = getBundleCoordinate();
fileSystemBundleProvider.deleteAllBundleVersions(bundleCoordinate);
assertEquals(0, bundleStorageDir.listFiles().length);
}
@ -306,11 +285,11 @@ public class TestFileSystemBundlePersistenceProvider {
return coordinate;
}
private static BundleCoordinate getBundleCoordinate(final String bucketId, final String groupId, final String artifactId) {
private static BundleCoordinate getBundleCoordinate() {
final BundleCoordinate coordinate = Mockito.mock(BundleCoordinate.class);
when(coordinate.getBucketId()).thenReturn(bucketId);
when(coordinate.getGroupId()).thenReturn(groupId);
when(coordinate.getArtifactId()).thenReturn(artifactId);
when(coordinate.getBucketId()).thenReturn(BUCKET_ID);
when(coordinate.getGroupId()).thenReturn(GROUP_ID);
when(coordinate.getArtifactId()).thenReturn(ARTIFACT_ID);
return coordinate;
}

View File

@ -36,21 +36,30 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.when;
public class TestFileSystemFlowPersistenceProvider {
private static final String BUCKET_ID = "b0000000-0000-0000-0000-000000000000";
private static final String SECOND_BUCKET_ID = "b2000000-0000-0000-0000-000000000000";
private static final String FLOW_ID = "f0000000-0000-0000-0000-000000000000";
private static final String SECOND_FLOW_ID = "f2000000-0000-0000-0000-000000000000";
private static final String FIRST_VERSION = "1.0.0";
private static final String SECOND_VERSION = "1.1.0";
static final String FLOW_STORAGE_DIR = "target/flow_storage";
static final ProviderConfigurationContext CONFIGURATION_CONTEXT = new ProviderConfigurationContext() {
@Override
public Map<String, String> getProperties() {
final Map<String,String> props = new HashMap<>();
props.put(FileSystemFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, FLOW_STORAGE_DIR);
return props;
}
static final ProviderConfigurationContext CONFIGURATION_CONTEXT = () -> {
final Map<String,String> props = new HashMap<>();
props.put(FileSystemFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, FLOW_STORAGE_DIR);
return props;
};
private File flowStorageDir;
@ -61,7 +70,7 @@ public class TestFileSystemFlowPersistenceProvider {
flowStorageDir = new File(FLOW_STORAGE_DIR);
if (flowStorageDir.exists()) {
org.apache.commons.io.FileUtils.cleanDirectory(flowStorageDir);
flowStorageDir.delete();
assertTrue(flowStorageDir.delete());
}
assertFalse(flowStorageDir.exists());
@ -73,24 +82,18 @@ public class TestFileSystemFlowPersistenceProvider {
@Test
public void testSaveSuccessfully() throws IOException {
createAndSaveSnapshot(fileSystemFlowProvider,"bucket1", "flow1", 1, "flow1v1");
verifySnapshot(flowStorageDir, "bucket1", "flow1", 1, "flow1v1");
createAndSaveSnapshot(fileSystemFlowProvider, 1, FIRST_VERSION);
verifySnapshot(flowStorageDir, 1, FIRST_VERSION);
createAndSaveSnapshot(fileSystemFlowProvider,"bucket1", "flow1", 2, "flow1v2");
verifySnapshot(flowStorageDir, "bucket1", "flow1", 2, "flow1v2");
createAndSaveSnapshot(fileSystemFlowProvider,"bucket1", "flow2", 1, "flow2v1");
verifySnapshot(flowStorageDir, "bucket1", "flow2", 1, "flow2v1");
createAndSaveSnapshot(fileSystemFlowProvider,"bucket2", "flow3", 1, "flow3v1");
verifySnapshot(flowStorageDir, "bucket2", "flow3", 1, "flow3v1");
createAndSaveSnapshot(fileSystemFlowProvider, 2, SECOND_VERSION);
verifySnapshot(flowStorageDir, 2, SECOND_VERSION);
}
@Test
public void testSaveWithExistingVersion() throws IOException {
final FlowSnapshotContext context = Mockito.mock(FlowSnapshotContext.class);
when(context.getBucketId()).thenReturn("bucket1");
when(context.getFlowId()).thenReturn("flow1");
when(context.getBucketId()).thenReturn(BUCKET_ID);
when(context.getFlowId()).thenReturn(FLOW_ID);
when(context.getVersion()).thenReturn(1);
final byte[] content = "flow1v1".getBytes(StandardCharsets.UTF_8);
@ -98,108 +101,96 @@ public class TestFileSystemFlowPersistenceProvider {
// save new content for an existing version
final byte[] content2 = "XXX".getBytes(StandardCharsets.UTF_8);
try {
fileSystemFlowProvider.saveFlowContent(context, content2);
fail("Should have thrown exception");
} catch (Exception e) {
}
assertThrows(Exception.class, () -> fileSystemFlowProvider.saveFlowContent(context, content2));
// verify the new content wasn't written
final File flowSnapshotFile = new File(flowStorageDir, "bucket1/flow1/1/1" + FileSystemFlowPersistenceProvider.SNAPSHOT_EXTENSION);
final String path = String.format("%s/%s/1/1%s", BUCKET_ID, FLOW_ID, FileSystemFlowPersistenceProvider.SNAPSHOT_EXTENSION);
final File flowSnapshotFile = new File(flowStorageDir, path);
try (InputStream in = new FileInputStream(flowSnapshotFile)) {
assertEquals("flow1v1", IOUtils.toString(in, StandardCharsets.UTF_8));
}
}
@Test
public void testSaveAndGet() throws IOException {
createAndSaveSnapshot(fileSystemFlowProvider,"bucket1", "flow1", 1, "flow1v1");
createAndSaveSnapshot(fileSystemFlowProvider,"bucket1", "flow1", 2, "flow1v2");
public void testSaveAndGet() {
createAndSaveSnapshot(fileSystemFlowProvider, 1, FIRST_VERSION);
createAndSaveSnapshot(fileSystemFlowProvider, 2, SECOND_VERSION);
final byte[] flow1v1 = fileSystemFlowProvider.getFlowContent("bucket1", "flow1", 1);
assertEquals("flow1v1", new String(flow1v1, StandardCharsets.UTF_8));
final byte[] flow1v1 = fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID, 1);
assertEquals(FIRST_VERSION, new String(flow1v1, StandardCharsets.UTF_8));
final byte[] flow1v2 = fileSystemFlowProvider.getFlowContent("bucket1", "flow1", 2);
assertEquals("flow1v2", new String(flow1v2, StandardCharsets.UTF_8));
final byte[] flow1v2 = fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID, 2);
assertEquals(SECOND_VERSION, new String(flow1v2, StandardCharsets.UTF_8));
}
@Test
public void testGetWhenDoesNotExist() {
final byte[] flow1v1 = fileSystemFlowProvider.getFlowContent("bucket1", "flow1", 1);
final byte[] flow1v1 = fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID, 1);
assertNull(flow1v1);
}
@Test
public void testDeleteSnapshots() throws IOException {
final String bucketId = "bucket1";
final String flowId = "flow1";
public void testDeleteSnapshots() {
createAndSaveSnapshot(fileSystemFlowProvider, 1, FIRST_VERSION);
createAndSaveSnapshot(fileSystemFlowProvider, 2, SECOND_VERSION);
createAndSaveSnapshot(fileSystemFlowProvider, bucketId, flowId, 1, "flow1v1");
createAndSaveSnapshot(fileSystemFlowProvider, bucketId, flowId, 2, "flow1v2");
assertNotNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID, 1));
assertNotNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID, 2));
assertNotNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId, 1));
assertNotNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId, 2));
fileSystemFlowProvider.deleteAllFlowContent(BUCKET_ID, FLOW_ID);
fileSystemFlowProvider.deleteAllFlowContent(bucketId, flowId);
assertNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId, 1));
assertNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId, 2));
assertNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID, 1));
assertNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID, 2));
// delete a flow that doesn't exist
fileSystemFlowProvider.deleteAllFlowContent(bucketId, "some-other-flow");
fileSystemFlowProvider.deleteAllFlowContent(BUCKET_ID, SECOND_FLOW_ID);
// delete a bucket that doesn't exist
fileSystemFlowProvider.deleteAllFlowContent("some-other-bucket", flowId);
fileSystemFlowProvider.deleteAllFlowContent(SECOND_BUCKET_ID, FLOW_ID);
}
@Test
public void testDeleteSnapshot() throws IOException {
final String bucketId = "bucket1";
final String flowId = "flow1";
public void testDeleteSnapshot() {
createAndSaveSnapshot(fileSystemFlowProvider, 1, FIRST_VERSION);
createAndSaveSnapshot(fileSystemFlowProvider, 2, SECOND_VERSION);
createAndSaveSnapshot(fileSystemFlowProvider, bucketId, flowId, 1, "flow1v1");
createAndSaveSnapshot(fileSystemFlowProvider, bucketId, flowId, 2, "flow1v2");
assertNotNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID, 1));
assertNotNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID, 2));
assertNotNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId, 1));
assertNotNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId, 2));
fileSystemFlowProvider.deleteFlowContent(BUCKET_ID, FLOW_ID, 1);
fileSystemFlowProvider.deleteFlowContent(bucketId, flowId, 1);
assertNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID, 1));
assertNotNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID, 2));
assertNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId, 1));
assertNotNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId, 2));
fileSystemFlowProvider.deleteFlowContent(BUCKET_ID, FLOW_ID, 2);
fileSystemFlowProvider.deleteFlowContent(bucketId, flowId, 2);
assertNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId, 1));
assertNull(fileSystemFlowProvider.getFlowContent(bucketId, flowId, 2));
assertNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID, 1));
assertNull(fileSystemFlowProvider.getFlowContent(BUCKET_ID, FLOW_ID, 2));
// delete a version that doesn't exist
fileSystemFlowProvider.deleteFlowContent(bucketId, flowId, 3);
fileSystemFlowProvider.deleteFlowContent(BUCKET_ID, FLOW_ID, 3);
// delete a flow that doesn't exist
fileSystemFlowProvider.deleteFlowContent(bucketId, "some-other-flow", 1);
fileSystemFlowProvider.deleteFlowContent(BUCKET_ID, SECOND_FLOW_ID, 1);
// delete a bucket that doesn't exist
fileSystemFlowProvider.deleteFlowContent("some-other-bucket", flowId, 1);
fileSystemFlowProvider.deleteFlowContent(SECOND_BUCKET_ID, FLOW_ID, 1);
}
private void createAndSaveSnapshot(final FlowPersistenceProvider flowPersistenceProvider, final String bucketId, final String flowId, final int version,
final String contentString) throws IOException {
private void createAndSaveSnapshot(final FlowPersistenceProvider flowPersistenceProvider, final int version, final String contentString) {
final FlowSnapshotContext context = Mockito.mock(FlowSnapshotContext.class);
when(context.getBucketId()).thenReturn(bucketId);
when(context.getFlowId()).thenReturn(flowId);
when(context.getBucketId()).thenReturn(BUCKET_ID);
when(context.getFlowId()).thenReturn(FLOW_ID);
when(context.getVersion()).thenReturn(version);
final byte[] content = contentString.getBytes(StandardCharsets.UTF_8);
flowPersistenceProvider.saveFlowContent(context, content);
}
private void verifySnapshot(final File flowStorageDir, final String bucketId, final String flowId, final int version,
final String contentString) throws IOException {
private void verifySnapshot(final File flowStorageDir, final int version, final String contentString) throws IOException {
// verify the correct snapshot file was created
final File flowSnapshotFile = new File(flowStorageDir,
bucketId + "/" + flowId + "/" + version + "/" + version + FileSystemFlowPersistenceProvider.SNAPSHOT_EXTENSION);
BUCKET_ID + "/" + FLOW_ID + "/" + version + "/" + version + FileSystemFlowPersistenceProvider.SNAPSHOT_EXTENSION);
assertTrue(flowSnapshotFile.exists());
try (InputStream in = new FileInputStream(flowSnapshotFile)) {

View File

@ -46,12 +46,25 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
private static final String TEST_FLOWS_BUCKET = "test-flows";
private static final Set<String> FLOW_IDS = Set.of(
"first-flow",
"flow-with-invalid-connection",
"port-moved-groups",
"Parent",
"Child"
);
private final ObjectMapper objectMapper = new ObjectMapper();
{
@ -77,14 +90,16 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
@Override
public boolean isStorageLocationApplicable(final FlowRegistryClientConfigurationContext context, final String storageLocation) {
try {
final File file = new java.io.File(URI.create(storageLocation));
final Path path = file.toPath();
final Path rootPath = getRootDirectory(context).toPath().normalize();
final URI location = URI.create(storageLocation);
final Path storageLocationPath = Paths.get(location.getPath()).normalize();
final String configuredDirectory = context.getProperty(DIRECTORY).getValue();
final Path rootPath = Paths.get(configuredDirectory);
// If this doesn't throw an Exception, the given storageLocation is relative to the root path
rootPath.relativize(path);
if (storageLocationPath.startsWith(rootPath)) {
// If this doesn't throw an Exception, the given storageLocation is relative to the root path
Objects.requireNonNull(rootPath.relativize(storageLocationPath));
} else {
return false;
}
} catch (final Exception e) {
return false;
}
@ -100,8 +115,7 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
throw new IOException("Cannot get listing of directory " + rootDir.getAbsolutePath());
}
final Set<FlowRegistryBucket> buckets = Arrays.stream(children).map(this::toBucket).collect(Collectors.toSet());
return buckets;
return Arrays.stream(children).map(this::toBucket).collect(Collectors.toSet());
}
private FlowRegistryBucket toBucket(final File file) {
@ -130,17 +144,14 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
@Override
public FlowRegistryBucket getBucket(final FlowRegistryClientConfigurationContext context, final String bucketId) {
final File rootDir = getRootDirectory(context);
final File bucketDir = new File(rootDir, bucketId);
final FlowRegistryBucket bucket = toBucket(bucketDir);
return bucket;
final File bucketDir = getChildLocation(rootDir, getValidatedBucketPath(bucketId));
return toBucket(bucketDir);
}
@Override
public RegisteredFlow registerFlow(final FlowRegistryClientConfigurationContext context, final RegisteredFlow flow) throws IOException {
final File rootDir = getRootDirectory(context);
final String bucketId = flow.getBucketIdentifier();
final File bucketDir = new File(rootDir, bucketId);
final File flowDir = new File(bucketDir, flow.getIdentifier());
final File flowDir = getFlowDirectory(context, bucketId, flow.getIdentifier());
Files.createDirectories(flowDir.toPath());
return flow;
@ -148,9 +159,7 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
@Override
public RegisteredFlow deregisterFlow(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws IOException {
final File rootDir = getRootDirectory(context);
final File bucketDir = new File(rootDir, bucketId);
final File flowDir = new File(bucketDir, flowId);
final File flowDir = getFlowDirectory(context, bucketId, flowId);
final File[] versionDirs = flowDir.listFiles();
@ -167,9 +176,7 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
@Override
public RegisteredFlow getFlow(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) {
final File rootDir = getRootDirectory(context);
final File bucketDir = new File(rootDir, bucketId);
final File flowDir = new File(bucketDir, flowId);
final File flowDir = getFlowDirectory(context, bucketId, flowId);
final File[] versionDirs = flowDir.listFiles();
@ -186,7 +193,7 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
@Override
public Set<RegisteredFlow> getFlows(final FlowRegistryClientConfigurationContext context, final String bucketId) throws IOException {
final File rootDir = getRootDirectory(context);
final File bucketDir = new File(rootDir, bucketId);
final File bucketDir = getChildLocation(rootDir, getValidatedBucketPath(bucketId));
final File[] flowDirs = bucketDir.listFiles();
if (flowDirs == null) {
throw new IOException("Could not get listing of directory " + bucketDir);
@ -203,16 +210,12 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
@Override
public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final int version) throws IOException {
final File rootDir = getRootDirectory(context);
final File bucketDir = new File(rootDir, bucketId);
final File flowDir = new File(bucketDir, flowId);
final File versionDir = new File(flowDir, String.valueOf(version));
final File snapshotFile = new File(versionDir, "snapshot.json");
final File flowDir = getFlowDirectory(context, bucketId, flowId);
final Pattern intPattern = Pattern.compile("\\d+");
final File[] versionFiles = flowDir.listFiles(file -> intPattern.matcher(file.getName()).matches());
final JsonFactory factory = new JsonFactory(objectMapper);
final File snapshotFile = getSnapshotFile(context, bucketId, flowId, version);
try (final JsonParser parser = factory.createParser(snapshotFile)) {
final RegisteredFlowSnapshot snapshot = parser.readValueAs(RegisteredFlowSnapshot.class);
populateBucket(snapshot, bucketId);
@ -264,22 +267,19 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
@Override
public RegisteredFlowSnapshot registerFlowSnapshot(final FlowRegistryClientConfigurationContext context, final RegisteredFlowSnapshot flowSnapshot) throws IOException {
final File rootDir = getRootDirectory(context);
final RegisteredFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata();
final String bucketId = metadata.getBucketIdentifier();
final String flowId = metadata.getFlowIdentifier();
final File flowDir = getFlowDirectory(context, bucketId, flowId);
final long version = metadata.getVersion();
final File bucketDir = new File(rootDir, bucketId);
final File flowDir = new File(bucketDir, flowId);
final File versionDir = new File(flowDir, String.valueOf(version));
final File versionDir = getChildLocation(flowDir, Paths.get(String.valueOf(version)));
// Create the directory for the version, if it doesn't exist.
if (!versionDir.exists()) {
Files.createDirectories(versionDir.toPath());
}
final File snapshotFile = new File(versionDir, "snapshot.json");
final File snapshotFile = getSnapshotFile(context, bucketId, flowId, version);
final RegisteredFlowSnapshot fullyPopulated = fullyPopulate(flowSnapshot, flowDir);
final JsonFactory factory = new JsonFactory(objectMapper);
@ -329,7 +329,7 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
flow.setPermissions(createAllowAllPermissions());
final File[] flowVersionDirs = flowDir.listFiles();
final int versionCount = flowVersionDirs == null ? 0 : flowVersionDirs.length;;
final int versionCount = flowVersionDirs == null ? 0 : flowVersionDirs.length;
flow.setVersionCount(versionCount);
final RegisteredFlowVersionInfo versionInfo = new RegisteredFlowVersionInfo();
@ -353,9 +353,7 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
@Override
public Set<RegisteredFlowSnapshotMetadata> getFlowVersions(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws IOException {
final File rootDir = getRootDirectory(context);
final File bucketDir = new File(rootDir, bucketId);
final File flowDir = new File(bucketDir, flowId);
final File flowDir = getFlowDirectory(context, bucketId, flowId);
final File[] versionDirs = flowDir.listFiles();
if (versionDirs == null) {
throw new IOException("Could not list directories of " + flowDir);
@ -379,9 +377,7 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
@Override
public int getLatestVersion(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) throws IOException {
final File rootDir = getRootDirectory(context);
final File bucketDir = new File(rootDir, bucketId);
final File flowDir = new File(bucketDir, flowId);
final File flowDir = getFlowDirectory(context, bucketId, flowId);
final File[] versionDirs = flowDir.listFiles();
if (versionDirs == null) {
throw new IOException("Cannot list directories of " + flowDir);
@ -393,4 +389,46 @@ public class FileSystemFlowRegistryClient extends AbstractFlowRegistryClient {
.max();
return greatestValue.orElse(-1);
}
private File getSnapshotFile(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId, final long version) {
final File flowDirectory = getFlowDirectory(context, bucketId, flowId);
final File versionDirectory = getChildLocation(flowDirectory, Paths.get(String.valueOf(version)));
return new File(versionDirectory, "snapshot.json");
}
private File getFlowDirectory(final FlowRegistryClientConfigurationContext context, final String bucketId, final String flowId) {
final File rootDir = getRootDirectory(context);
final File bucketDir = getChildLocation(rootDir, getValidatedBucketPath(bucketId));
return getChildLocation(bucketDir, getFlowPath(flowId));
}
private File getChildLocation(final File parentDir, final Path childLocation) {
final Path parentPath = parentDir.toPath().normalize();
final Path childPath = parentPath.resolve(childLocation.normalize());
if (childPath.startsWith(parentPath)) {
return childPath.toFile();
}
throw new IllegalArgumentException(String.format("Child location not valid [%s]", childLocation));
}
private Path getFlowPath(final String flowId) {
final Optional<String> flowIdFound = FLOW_IDS.stream().filter(id -> id.equals(flowId)).findFirst();
if (flowIdFound.isPresent()) {
return Paths.get(flowIdFound.get());
}
try {
final UUID flowIdentifier = UUID.fromString(flowId);
return Paths.get(flowIdentifier.toString());
} catch (final RuntimeException e) {
throw new IllegalArgumentException(String.format("Flow ID [%s] not validated", flowId));
}
}
private Path getValidatedBucketPath(final String id) {
if (TEST_FLOWS_BUCKET.equals(id)) {
return Paths.get(TEST_FLOWS_BUCKET);
}
throw new IllegalArgumentException(String.format("Bucket [%s] not validated", id));
}
}

View File

@ -40,8 +40,6 @@ import org.apache.nifi.web.api.entity.SnippetEntity;
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@ -59,7 +57,9 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RegistryClientIT extends NiFiSystemIT {
private static final Logger logger = LoggerFactory.getLogger(RegistryClientIT.class);
private static final String TEST_FLOWS_BUCKET = "test-flows";
private static final String FIRST_FLOW_ID = "first-flow";
/**
* Test a scenario where we have Parent Process Group with a child process group. The child group is under Version Control.
@ -89,8 +89,8 @@ public class RegistryClientIT extends NiFiSystemIT {
final ProcessorEntity terminate = util.createProcessor("TerminateFlowFile", parent.getId());
final ConnectionEntity connectionToTerminate = util.createConnection(outputPort, terminate);
final VersionControlInformationEntity childVci = util.startVersionControl(child, clientEntity, "testChangeVersionOnParentThatCascadesToChild", "Child");
final VersionControlInformationEntity parentVci = util.startVersionControl(parent, clientEntity, "testChangeVersionOnParentThatCascadesToChild", "Parent");
final VersionControlInformationEntity childVci = util.startVersionControl(child, clientEntity, TEST_FLOWS_BUCKET, "Child");
final VersionControlInformationEntity parentVci = util.startVersionControl(parent, clientEntity, TEST_FLOWS_BUCKET, "Parent");
// Change the properties of the UpdateContent processor and commit as v2
util.updateProcessorProperties(updateContents, Collections.singletonMap("Content", "Updated v2"));
@ -160,7 +160,7 @@ public class RegistryClientIT extends NiFiSystemIT {
final ConnectionEntity generateToCount = util.createConnection(generate, countProcessor, "success");
// Save the flow as v1
final VersionControlInformationEntity v1Vci = util.startVersionControl(parent, clientEntity, "testChangeConnectionDestinationRemoveOldAndMoveGroup", "Parent");
final VersionControlInformationEntity v1Vci = util.startVersionControl(parent, clientEntity, TEST_FLOWS_BUCKET, "Parent");
// Create a Terminate processor and change flow to be:
// Generate -> Terminate - remove the old Count Processor
@ -215,7 +215,7 @@ public class RegistryClientIT extends NiFiSystemIT {
util.createConnection(generate, countProcessor, "success");
// Save the flow as v1
final VersionControlInformationEntity vci = util.startVersionControl(group, clientEntity, "testControllerServiceUpdateWhileRunning", "Parent");
final VersionControlInformationEntity vci = util.startVersionControl(group, clientEntity, TEST_FLOWS_BUCKET, "Parent");
// Change the value of of the Controller Service's start value to 2000, and change the text of the GenerateFlowFile just to make it run each time the version is changed
util.updateControllerServiceProperties(service, Collections.singletonMap("Start Value", "2000"));
@ -259,7 +259,7 @@ public class RegistryClientIT extends NiFiSystemIT {
public void testChangeVersionWithPortMoveBetweenGroups() throws NiFiClientException, IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient(new File("src/test/resources/versioned-flows"));
final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), "test-flows", "port-moved-groups", 1);
final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, "port-moved-groups", 1);
assertNotNull(imported);
getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
@ -306,7 +306,7 @@ public class RegistryClientIT extends NiFiSystemIT {
public void testRollbackOnFailure() throws NiFiClientException, IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient(new File("src/test/resources/versioned-flows"));
final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), "test-flows", "flow-with-invalid-connection", 1);
final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, "flow-with-invalid-connection", 1);
assertNotNull(imported);
getClientUtil().assertFlowStaleAndUnmodified(imported.getId());
@ -329,7 +329,7 @@ public class RegistryClientIT extends NiFiSystemIT {
final ProcessGroupEntity group = getClientUtil().createProcessGroup("Outer", "root");
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", group.getId());
final VersionControlInformationEntity vci = getClientUtil().startVersionControl(group, clientEntity, "First Bucket", "First Flow");
final VersionControlInformationEntity vci = getClientUtil().startVersionControl(group, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID);
final ProcessGroupEntity imported = getClientUtil().importFlowFromRegistry("root", vci.getVersionControlInformation());
assertNotNull(imported);
@ -352,7 +352,7 @@ public class RegistryClientIT extends NiFiSystemIT {
final ProcessGroupEntity group = getClientUtil().createProcessGroup("Outer", "root");
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", group.getId());
getClientUtil().startVersionControl(group, clientEntity, "First Bucket", "First Flow");
getClientUtil().startVersionControl(group, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID);
String versionedFlowState = getClientUtil().getVersionedFlowState(group.getId(), "root");
assertEquals("UP_TO_DATE", versionedFlowState);
@ -379,13 +379,13 @@ public class RegistryClientIT extends NiFiSystemIT {
// Create a top-level PG and version it with nothing in it.
final FlowRegistryClientEntity clientEntity = registerClient();
final ProcessGroupEntity outerGroup = getClientUtil().createProcessGroup("Outer", "root");
getClientUtil().startVersionControl(outerGroup, clientEntity, "First Bucket", "First Flow");
getClientUtil().startVersionControl(outerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID);
// Create a lower level PG and add a Processor.
// Commit as Version 2 of the group.
final ProcessGroupEntity inner1 = getClientUtil().createProcessGroup("Inner 1", outerGroup.getId());
ProcessorEntity terminate1 = getClientUtil().createProcessor("TerminateFlowFile", inner1.getId());
VersionControlInformationEntity vciEntity = getClientUtil().startVersionControl(outerGroup, clientEntity, "First Bucket", "First Flow");
VersionControlInformationEntity vciEntity = getClientUtil().startVersionControl(outerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID);
assertEquals(2, vciEntity.getVersionControlInformation().getVersion());
// Get an up-to-date copy of terminate1 because it should now have a non-null versioned component id
@ -407,7 +407,7 @@ public class RegistryClientIT extends NiFiSystemIT {
assertNotEquals(terminate1.getComponent().getVersionedComponentId(), terminate2.getComponent().getVersionedComponentId());
// First Control again with the newly created components
vciEntity = getClientUtil().startVersionControl(outerGroup, clientEntity, "First Bucket", "First Flow");
vciEntity = getClientUtil().startVersionControl(outerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID);
assertEquals(3, vciEntity.getVersionControlInformation().getVersion());
// Get new version of terminate2 processor and terminate1 processor. Ensure that both have version control ID's but that they are different.
@ -429,7 +429,7 @@ public class RegistryClientIT extends NiFiSystemIT {
// Commit as Version 2 of the group.
final ProcessGroupEntity innerGroup = getClientUtil().createProcessGroup("Inner 1", topLevel1.getId());
ProcessorEntity terminate1 = getClientUtil().createProcessor("TerminateFlowFile", innerGroup.getId());
VersionControlInformationEntity vciEntity = getClientUtil().startVersionControl(innerGroup, clientEntity, "First Bucket", "First Flow");
VersionControlInformationEntity vciEntity = getClientUtil().startVersionControl(innerGroup, clientEntity, TEST_FLOWS_BUCKET, FIRST_FLOW_ID);
assertEquals(1, vciEntity.getVersionControlInformation().getVersion());
// Now that the inner group is under version control, copy it and paste it to a new PG.

View File

@ -740,7 +740,7 @@ public class StatelessBasicsIT extends NiFiSystemIT {
final FlowRegistryClientEntity registryClient = registerClient();
// Register the first version of the flow
final VersionControlInformationEntity vci = getClientUtil().startVersionControl(statelessGroup, registryClient, "First Bucket", "testChangeFlowVersion");
final VersionControlInformationEntity vci = getClientUtil().startVersionControl(statelessGroup, registryClient, "test-flows", "first-flow");
waitFor(() -> VersionControlInformationDTO.UP_TO_DATE.equals(getClientUtil().getVersionControlState(statelessGroup.getId())) );
// Update the flow