Allow injection of lock parameters + correct usage of locked file write
This commit is contained in:
parent
f4b54de8ed
commit
a05f471c9c
|
@ -8,6 +8,7 @@ import java.text.SimpleDateFormat;
|
|||
import java.util.*;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
@ -77,12 +78,13 @@ public class FilesystemPackageCacheManager extends BasePackageCacheManager imple
|
|||
|
||||
private final FilesystemPackageCacheManagerLocks locks;
|
||||
|
||||
private final FilesystemPackageCacheManagerLocks.LockParameters lockParameters;
|
||||
|
||||
// When running in testing mode, some packages are provided from the test case repository rather than by the normal means
|
||||
// the PackageProvider is responsible for this. if no package provider is defined, or it declines to handle the package,
|
||||
// then the normal means will be used
|
||||
public interface IPackageProvider {
|
||||
boolean handlesPackage(String id, String version);
|
||||
|
||||
InputStreamWithSrc provide(String id, String version) throws IOException;
|
||||
}
|
||||
|
||||
|
@ -92,6 +94,7 @@ public class FilesystemPackageCacheManager extends BasePackageCacheManager imple
|
|||
public static final String PACKAGE_VERSION_REGEX_OPT = "^[A-Za-z][A-Za-z0-9\\_\\-]*(\\.[A-Za-z0-9\\_\\-]+)+(\\#[A-Za-z0-9\\-\\_]+(\\.[A-Za-z0-9\\-\\_]+)*)?$";
|
||||
private static final Logger ourLog = LoggerFactory.getLogger(FilesystemPackageCacheManager.class);
|
||||
private static final String CACHE_VERSION = "3"; // second version - see wiki page
|
||||
|
||||
@Nonnull
|
||||
private final File cacheFolder;
|
||||
|
||||
|
@ -100,6 +103,7 @@ public class FilesystemPackageCacheManager extends BasePackageCacheManager imple
|
|||
private final Map<String, String> ciList = new HashMap<>();
|
||||
private JsonArray buildInfo;
|
||||
private boolean suppressErrors;
|
||||
|
||||
@Setter
|
||||
@Getter
|
||||
private boolean minimalMemory;
|
||||
|
@ -113,9 +117,20 @@ public class FilesystemPackageCacheManager extends BasePackageCacheManager imple
|
|||
@Getter
|
||||
private final List<PackageServer> packageServers;
|
||||
|
||||
@With
|
||||
@Getter
|
||||
private final FilesystemPackageCacheManagerLocks.LockParameters lockParameters;
|
||||
|
||||
public Builder() throws IOException {
|
||||
this.cacheFolder = getUserCacheFolder();
|
||||
this.packageServers = getPackageServersFromFHIRSettings();
|
||||
this.lockParameters = null;
|
||||
}
|
||||
|
||||
private Builder(File cacheFolder, List<PackageServer> packageServers, FilesystemPackageCacheManagerLocks.LockParameters lockParameters) {
|
||||
this.cacheFolder = cacheFolder;
|
||||
this.packageServers = packageServers;
|
||||
this.lockParameters = lockParameters;
|
||||
}
|
||||
|
||||
private File getUserCacheFolder() throws IOException {
|
||||
|
@ -143,17 +158,12 @@ public class FilesystemPackageCacheManager extends BasePackageCacheManager imple
|
|||
return PackageServer.getConfiguredServers();
|
||||
}
|
||||
|
||||
private Builder(File cacheFolder, List<PackageServer> packageServers) {
|
||||
this.cacheFolder = cacheFolder;
|
||||
this.packageServers = packageServers;
|
||||
}
|
||||
|
||||
public Builder withCacheFolder(String cacheFolderPath) throws IOException {
|
||||
File cacheFolder = ManagedFileAccess.file(cacheFolderPath);
|
||||
if (!cacheFolder.exists()) {
|
||||
throw new FHIRException("The folder '" + cacheFolder + "' could not be found");
|
||||
}
|
||||
return new Builder(cacheFolder, this.packageServers);
|
||||
return new Builder(cacheFolder, this.packageServers, this.lockParameters);
|
||||
}
|
||||
|
||||
public Builder withSystemCacheFolder() throws IOException {
|
||||
|
@ -163,11 +173,11 @@ public class FilesystemPackageCacheManager extends BasePackageCacheManager imple
|
|||
} else {
|
||||
systemCacheFolder = ManagedFileAccess.file(Utilities.path("/var", "lib", ".fhir", "packages"));
|
||||
}
|
||||
return new Builder(systemCacheFolder, this.packageServers);
|
||||
return new Builder(systemCacheFolder, this.packageServers, this.lockParameters);
|
||||
}
|
||||
|
||||
public Builder withTestingCacheFolder() throws IOException {
|
||||
return new Builder(ManagedFileAccess.file(Utilities.path("[tmp]", ".fhir", "packages")), this.packageServers);
|
||||
return new Builder(ManagedFileAccess.file(Utilities.path("[tmp]", ".fhir", "packages")), this.packageServers, this.lockParameters);
|
||||
}
|
||||
|
||||
public FilesystemPackageCacheManager build() throws IOException {
|
||||
|
@ -181,15 +191,15 @@ public class FilesystemPackageCacheManager extends BasePackageCacheManager imple
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
return new FilesystemPackageCacheManager(cacheFolder, packageServers, locks);
|
||||
return new FilesystemPackageCacheManager(cacheFolder, packageServers, locks, lockParameters);
|
||||
}
|
||||
}
|
||||
|
||||
private FilesystemPackageCacheManager(@Nonnull File cacheFolder, @Nonnull List<PackageServer> packageServers, @Nonnull FilesystemPackageCacheManagerLocks locks) throws IOException {
|
||||
private FilesystemPackageCacheManager(@Nonnull File cacheFolder, @Nonnull List<PackageServer> packageServers, @Nonnull FilesystemPackageCacheManagerLocks locks, @Nullable FilesystemPackageCacheManagerLocks.LockParameters lockParameters) throws IOException {
|
||||
super(packageServers);
|
||||
this.cacheFolder = cacheFolder;
|
||||
this.locks = locks;
|
||||
|
||||
this.lockParameters = lockParameters;
|
||||
prepareCacheFolder();
|
||||
}
|
||||
|
||||
|
@ -441,7 +451,7 @@ public class FilesystemPackageCacheManager extends BasePackageCacheManager imple
|
|||
}
|
||||
|
||||
return null;
|
||||
});
|
||||
}, lockParameters);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -485,7 +495,7 @@ public class FilesystemPackageCacheManager extends BasePackageCacheManager imple
|
|||
return null;
|
||||
}
|
||||
return loadPackageInfo(path);
|
||||
});
|
||||
}, lockParameters);
|
||||
if (foundPackage != null) {
|
||||
if (foundPackage.isIndexed()){
|
||||
return foundPackage;
|
||||
|
@ -508,7 +518,7 @@ public class FilesystemPackageCacheManager extends BasePackageCacheManager imple
|
|||
String path = Utilities.path(cacheFolder, foundPackageFolder);
|
||||
output.checkIndexed(path);
|
||||
return output;
|
||||
});
|
||||
}, lockParameters);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -609,7 +619,7 @@ public class FilesystemPackageCacheManager extends BasePackageCacheManager imple
|
|||
throw e;
|
||||
}
|
||||
return npmPackage;
|
||||
});
|
||||
}, lockParameters);
|
||||
}
|
||||
|
||||
private void log(String s) {
|
||||
|
|
|
@ -1,14 +1,18 @@
|
|||
package org.hl7.fhir.utilities.npm;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.hl7.fhir.utilities.TextFile;
|
||||
import lombok.With;
|
||||
import org.hl7.fhir.utilities.Utilities;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.channels.FileLock;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -27,10 +31,24 @@ public class FilesystemPackageCacheManagerLocks {
|
|||
|
||||
private final File cacheFolder;
|
||||
|
||||
private final Long lockTimeoutTime;
|
||||
private static final LockParameters lockParameters = new LockParameters();
|
||||
|
||||
public static class LockParameters {
|
||||
@Getter @With
|
||||
private final long lockTimeoutTime;
|
||||
@Getter @With
|
||||
private final TimeUnit lockTimeoutTimeUnit;
|
||||
|
||||
public LockParameters() {
|
||||
this(60L, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public LockParameters(long lockTimeoutTime, TimeUnit lockTimeoutTimeUnit) {
|
||||
this.lockTimeoutTime = lockTimeoutTime;
|
||||
this.lockTimeoutTimeUnit = lockTimeoutTimeUnit;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is intended to be used only for testing purposes.
|
||||
* <p/>
|
||||
|
@ -43,21 +61,9 @@ public class FilesystemPackageCacheManagerLocks {
|
|||
* @throws IOException
|
||||
*/
|
||||
public FilesystemPackageCacheManagerLocks(File cacheFolder) throws IOException {
|
||||
this(cacheFolder, 60L, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private FilesystemPackageCacheManagerLocks(File cacheFolder, Long lockTimeoutTime, TimeUnit lockTimeoutTimeUnit) throws IOException {
|
||||
this.cacheFolder = cacheFolder;
|
||||
this.lockTimeoutTime = lockTimeoutTime;
|
||||
this.lockTimeoutTimeUnit = lockTimeoutTimeUnit;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is intended to be used only for testing purposes.
|
||||
*/
|
||||
protected FilesystemPackageCacheManagerLocks withLockTimeout(Long lockTimeoutTime, TimeUnit lockTimeoutTimeUnit) throws IOException {
|
||||
return new FilesystemPackageCacheManagerLocks(cacheFolder, lockTimeoutTime, lockTimeoutTimeUnit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a single FilesystemPackageCacheManagerLocks instance for the given cacheFolder.
|
||||
|
@ -114,7 +120,7 @@ public class FilesystemPackageCacheManagerLocks {
|
|||
this.lock = lock;
|
||||
}
|
||||
|
||||
private void checkForLockFileWaitForDeleteIfExists(File lockFile) throws IOException {
|
||||
private void checkForLockFileWaitForDeleteIfExists(File lockFile, @Nonnull LockParameters lockParameters) throws IOException {
|
||||
if (!lockFile.exists()) {
|
||||
return;
|
||||
}
|
||||
|
@ -129,10 +135,11 @@ public class FilesystemPackageCacheManagerLocks {
|
|||
channel.close();
|
||||
throw new IOException("Lock file exists, but is not locked by a process: " + lockFile.getName());
|
||||
}
|
||||
System.out.println("File is locked.");
|
||||
}
|
||||
}
|
||||
try {
|
||||
waitForLockFileDeletion(lockFile);
|
||||
waitForLockFileDeletion(lockFile, lockParameters);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IOException("Thread interrupted while waiting for lock", e);
|
||||
|
@ -143,12 +150,13 @@ public class FilesystemPackageCacheManagerLocks {
|
|||
Wait for the lock file to be deleted. If the lock file is not deleted within the timeout or if the thread is
|
||||
interrupted, an IOException is thrown.
|
||||
*/
|
||||
private void waitForLockFileDeletion(File lockFile) throws IOException, InterruptedException {
|
||||
private void waitForLockFileDeletion(File lockFile, @Nonnull LockParameters lockParameters) throws IOException, InterruptedException {
|
||||
|
||||
try (WatchService watchService = FileSystems.getDefault().newWatchService()) {
|
||||
Path dir = lockFile.getParentFile().toPath();
|
||||
dir.register(watchService, StandardWatchEventKinds.ENTRY_DELETE);
|
||||
|
||||
WatchKey key = watchService.poll(lockTimeoutTime, lockTimeoutTimeUnit);
|
||||
WatchKey key = watchService.poll(lockParameters.lockTimeoutTime, lockParameters.lockTimeoutTimeUnit);
|
||||
if (key == null) {
|
||||
// It is possible that the lock file is deleted before the watch service is registered, so if we timeout at
|
||||
// this point, we should check if the lock file still exists.
|
||||
|
@ -173,15 +181,27 @@ public class FilesystemPackageCacheManagerLocks {
|
|||
}
|
||||
|
||||
|
||||
public <T> T doReadWithLock(FilesystemPackageCacheManager.CacheLockFunction<T> f) throws IOException {
|
||||
/**
|
||||
* Wraps the execution of a package read function with the appropriate cache, package, and .lock file locking and
|
||||
* checks.
|
||||
*
|
||||
* @param function The function to execute
|
||||
* @param lockParameters The parameters for the lock
|
||||
* @return The return of the function
|
||||
* @param <T> The return type of the function
|
||||
* @throws IOException If an error occurs while managing locking.
|
||||
*/
|
||||
public <T> T doReadWithLock(FilesystemPackageCacheManager.CacheLockFunction<T> function, @Nullable LockParameters lockParameters) throws IOException {
|
||||
final LockParameters resolvedLockParameters = lockParameters != null ? lockParameters : FilesystemPackageCacheManagerLocks.lockParameters;
|
||||
|
||||
cacheLock.getLock().readLock().lock();
|
||||
lock.readLock().lock();
|
||||
|
||||
checkForLockFileWaitForDeleteIfExists(lockFile);
|
||||
checkForLockFileWaitForDeleteIfExists(lockFile, resolvedLockParameters);
|
||||
|
||||
T result = null;
|
||||
try {
|
||||
result = f.get();
|
||||
result = function.get();
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
cacheLock.getLock().readLock().unlock();
|
||||
|
@ -189,7 +209,19 @@ public class FilesystemPackageCacheManagerLocks {
|
|||
return result;
|
||||
}
|
||||
|
||||
public <T> T doWriteWithLock(FilesystemPackageCacheManager.CacheLockFunction<T> f) throws IOException {
|
||||
/**
|
||||
* Wraps the execution of a package write function with the appropriate cache, package, and .lock file locking and
|
||||
* checks.
|
||||
*
|
||||
* @param function The function to execute
|
||||
* @param lockParameters The parameters for the lock
|
||||
* @return The return of the function
|
||||
* @param <T> The return type of the function
|
||||
* @throws IOException If an error occurs while managing locking.
|
||||
*/
|
||||
public <T> T doWriteWithLock(FilesystemPackageCacheManager.CacheLockFunction<T> function, @Nullable LockParameters lockParameters) throws IOException {
|
||||
final LockParameters resolvedLockParameters = lockParameters != null ? lockParameters : FilesystemPackageCacheManagerLocks.lockParameters;
|
||||
|
||||
cacheLock.getLock().writeLock().lock();
|
||||
lock.writeLock().lock();
|
||||
|
||||
|
@ -198,19 +230,20 @@ public class FilesystemPackageCacheManagerLocks {
|
|||
FileLock fileLock = channel.tryLock(0, Long.MAX_VALUE, false);
|
||||
|
||||
if (fileLock == null) {
|
||||
waitForLockFileDeletion(lockFile);
|
||||
waitForLockFileDeletion(lockFile, resolvedLockParameters);
|
||||
fileLock = channel.tryLock(0, Long.MAX_VALUE, false);
|
||||
}
|
||||
if (fileLock == null) {
|
||||
throw new IOException("Failed to acquire lock on file: " + lockFile.getName());
|
||||
}
|
||||
|
||||
if (!lockFile.isFile()) {
|
||||
try {
|
||||
TextFile.stringToFile(String.valueOf(ProcessHandle.current().pid()), lockFile);
|
||||
} catch (IOException e) {
|
||||
throw new IOException("Error writing lock file.", e);
|
||||
}
|
||||
final ByteBuffer buff = ByteBuffer.wrap(String.valueOf(ProcessHandle.current().pid()).getBytes(StandardCharsets.UTF_8));
|
||||
channel.write(buff);
|
||||
}
|
||||
T result = null;
|
||||
try {
|
||||
result = f.get();
|
||||
result = function.get();
|
||||
} finally {
|
||||
fileLock.release();
|
||||
channel.close();
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
package org.hl7.fhir.utilities.npm;
|
||||
|
||||
import org.hl7.fhir.utilities.TextFile;
|
||||
import org.hl7.fhir.utilities.filesystem.ManagedFileAccess;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -12,6 +11,7 @@ import java.nio.file.Path;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
@ -49,13 +49,13 @@ public class FilesystemPackageManagerLockTests {
|
|||
packageLock.doWriteWithLock(() -> {
|
||||
assertThat(packageLock.getLockFile()).exists();
|
||||
return null;
|
||||
});
|
||||
}, null);
|
||||
assertThat(packageLock.getLockFile()).doesNotExist();
|
||||
|
||||
packageLock.doReadWithLock(() -> {
|
||||
assertThat(packageLock.getLockFile()).doesNotExist();
|
||||
return null;
|
||||
});
|
||||
}, null);
|
||||
}
|
||||
|
||||
@Test void testNoPackageWriteOrReadWhileWholeCacheIsLocked() throws IOException, InterruptedException {
|
||||
|
@ -87,7 +87,7 @@ public class FilesystemPackageManagerLockTests {
|
|||
packageLock.doWriteWithLock(() -> {
|
||||
assertThat(cacheLockFinished.get()).isTrue();
|
||||
return null;
|
||||
});
|
||||
}, null);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -97,7 +97,7 @@ public class FilesystemPackageManagerLockTests {
|
|||
packageLock.doReadWithLock(() -> {
|
||||
assertThat(cacheLockFinished.get()).isTrue();
|
||||
return null;
|
||||
});
|
||||
}, null);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -133,7 +133,7 @@ public class FilesystemPackageManagerLockTests {
|
|||
assertThat(writeCount).isEqualTo(1);
|
||||
writeCounter.decrementAndGet();
|
||||
return null;
|
||||
});
|
||||
}, null);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -156,7 +156,7 @@ public class FilesystemPackageManagerLockTests {
|
|||
}
|
||||
readCounter.decrementAndGet();
|
||||
return null;
|
||||
});
|
||||
}, null);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -179,49 +179,46 @@ public class FilesystemPackageManagerLockTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testReadWhenLockedByFileTimesOut() throws IOException {
|
||||
FilesystemPackageCacheManagerLocks shorterTimeoutManager = filesystemPackageCacheLockManager.withLockTimeout(3L, TimeUnit.SECONDS);
|
||||
public void testReadWhenLockedByFileTimesOut() throws IOException, InterruptedException, TimeoutException {
|
||||
FilesystemPackageCacheManagerLocks shorterTimeoutManager = filesystemPackageCacheLockManager;
|
||||
final FilesystemPackageCacheManagerLocks.PackageLock packageLock = shorterTimeoutManager.getPackageLock(DUMMY_PACKAGE);
|
||||
File lockFile = createPackageLockFile();
|
||||
File lockFile = getPackageLockFile();
|
||||
Thread lockThread = LockfileUtility.lockWaitAndDeleteInNewProcess(cachePath, lockFile.getName(), 5);
|
||||
LockfileUtility.waitForLockfileCreation(cachePath,lockFile.getName());
|
||||
|
||||
Exception exception = assertThrows(IOException.class, () -> {
|
||||
packageLock.doReadWithLock(() -> {
|
||||
assertThat(lockFile).exists();
|
||||
return null;
|
||||
});
|
||||
}, new FilesystemPackageCacheManagerLocks.LockParameters(3L, TimeUnit.SECONDS));
|
||||
});
|
||||
|
||||
assertThat(exception.getMessage()).contains("Error reading package");
|
||||
assertThat(exception.getMessage()).contains("Package cache timed out waiting for lock");
|
||||
assertThat(exception.getCause().getMessage()).contains("Timeout waiting for lock file deletion: " + lockFile.getName());
|
||||
|
||||
lockThread.join();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadWhenLockFileIsDeleted() throws IOException {
|
||||
FilesystemPackageCacheManagerLocks shorterTimeoutManager = filesystemPackageCacheLockManager.withLockTimeout(5L, TimeUnit.SECONDS);
|
||||
final FilesystemPackageCacheManagerLocks.PackageLock packageLock = shorterTimeoutManager.getPackageLock(DUMMY_PACKAGE);
|
||||
File lockFile = createPackageLockFile();
|
||||
public void testReadWhenLockFileIsDeleted() throws IOException, InterruptedException, TimeoutException {
|
||||
|
||||
Thread t = new Thread(() -> {
|
||||
try {
|
||||
Thread.sleep(2000);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
lockFile.delete();
|
||||
});
|
||||
t.start();
|
||||
final FilesystemPackageCacheManagerLocks.PackageLock packageLock = filesystemPackageCacheLockManager.getPackageLock(DUMMY_PACKAGE);
|
||||
|
||||
final File lockFile = getPackageLockFile();
|
||||
|
||||
Thread lockThread = LockfileUtility.lockWaitAndDeleteInNewProcess(cachePath, lockFile.getName(), 5);
|
||||
LockfileUtility.waitForLockfileCreation(cachePath,lockFile.getName());
|
||||
|
||||
packageLock.doReadWithLock(() -> {
|
||||
assertThat(lockFile).doesNotExist();
|
||||
return null;
|
||||
});
|
||||
}, new FilesystemPackageCacheManagerLocks.LockParameters(10L, TimeUnit.SECONDS));
|
||||
|
||||
lockThread.join();
|
||||
}
|
||||
|
||||
private File createPackageLockFile() throws IOException {
|
||||
File lockFile = Path.of(cachePath, DUMMY_PACKAGE + ".lock").toFile();
|
||||
TextFile.stringToFile("", lockFile);
|
||||
return lockFile;
|
||||
private File getPackageLockFile() {
|
||||
return Path.of(cachePath, DUMMY_PACKAGE + ".lock").toFile();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import java.util.UUID;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
|
@ -118,44 +119,35 @@ public class FilesystemPackageManagerTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testTimeoutForLockedPackageRead() throws IOException, InterruptedException {
|
||||
public void testTimeoutForLockedPackageRead() throws IOException, InterruptedException, TimeoutException {
|
||||
String pcmPath = ManagedFileAccess.fromPath(Files.createTempDirectory("fpcm-multithreadingTest")).getAbsolutePath();
|
||||
|
||||
final FilesystemPackageCacheManager pcm = new FilesystemPackageCacheManager.Builder().withCacheFolder(pcmPath).build();
|
||||
final FilesystemPackageCacheManager pcm = new FilesystemPackageCacheManager.Builder()
|
||||
.withCacheFolder(pcmPath)
|
||||
.withLockParameters(new FilesystemPackageCacheManagerLocks.LockParameters(5,TimeUnit.SECONDS))
|
||||
.build();
|
||||
|
||||
Assertions.assertTrue(pcm.listPackages().isEmpty());
|
||||
|
||||
//Now sneak in a new lock file and directory:
|
||||
File lockFile = ManagedFileAccess.file(pcmPath, "example.fhir.uv.myig#1.2.3.lock");
|
||||
lockFile.createNewFile();
|
||||
Thread lockThread = LockfileUtility.lockWaitAndDeleteInNewProcess(pcmPath, "example.fhir.uv.myig#1.2.3.lock", 10);
|
||||
File directory = ManagedFileAccess.file(pcmPath, "example.fhir.uv.myig#1.2.3" );
|
||||
directory.mkdir();
|
||||
|
||||
LockfileUtility.waitForLockfileCreation(pcmPath, "example.fhir.uv.myig#1.2.3.lock");
|
||||
|
||||
IOException exception = assertThrows(IOException.class, () -> pcm.loadPackageFromCacheOnly("example.fhir.uv.myig", "1.2.3"));
|
||||
assertThat(exception.getMessage()).contains("Error reading package.");
|
||||
|
||||
assertThat(exception.getMessage()).contains("Package cache timed out waiting for lock");
|
||||
assertThat(exception.getCause().getMessage()).contains("Timeout waiting for lock file deletion");
|
||||
lockThread.join();
|
||||
}
|
||||
|
||||
private static Thread lockWaitAndDelete(String path, String lockFileName, int seconds) {
|
||||
Thread t = new Thread(() -> {
|
||||
ProcessBuilder processBuilder = new ProcessBuilder("java", "-cp", "target/test-classes:.", "org.hl7.fhir.utilities.npm.LockfileUtility", path, lockFileName, Integer.toString(seconds));
|
||||
try {
|
||||
Process process = processBuilder.start();
|
||||
process.getErrorStream().transferTo(System.err);
|
||||
process.getInputStream().transferTo(System.out);
|
||||
process.waitFor();
|
||||
} catch (IOException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
return t;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testReadFromCacheOnlyWaitsForLockDelete() throws IOException, InterruptedException {
|
||||
public void testReadFromCacheOnlyWaitsForLockDelete() throws IOException, InterruptedException, TimeoutException {
|
||||
String pcmPath = ManagedFileAccess.fromPath(Files.createTempDirectory("fpcm-multithreadingTest")).getAbsolutePath();
|
||||
|
||||
final FilesystemPackageCacheManager pcm = new FilesystemPackageCacheManager.Builder().withCacheFolder(pcmPath).build();
|
||||
|
@ -166,23 +158,20 @@ public class FilesystemPackageManagerTests {
|
|||
|
||||
String packageAndVersion = "example.fhir.uv.myig#1.2.3";
|
||||
|
||||
String lockFileName = packageAndVersion + ".lock";
|
||||
//Now sneak in a new lock file and directory:
|
||||
File lockFile = ManagedFileAccess.file(pcmPath, lockFileName);
|
||||
lockFile.createNewFile();
|
||||
|
||||
File directory = ManagedFileAccess.file(pcmPath, packageAndVersion);
|
||||
directory.mkdir();
|
||||
|
||||
final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
|
||||
executor.schedule(() -> {
|
||||
lockFile.delete();
|
||||
}, 5, TimeUnit.SECONDS);
|
||||
Thread lockThread = LockfileUtility.lockWaitAndDeleteInNewProcess(pcmPath, "example.fhir.uv.myig#1.2.3.lock", 5);
|
||||
LockfileUtility.waitForLockfileCreation(pcmPath, "example.fhir.uv.myig#1.2.3.lock");
|
||||
|
||||
|
||||
NpmPackage npmPackage = pcm.loadPackageFromCacheOnly("example.fhir.uv.myig", "1.2.3");
|
||||
|
||||
assertThat(npmPackage.id()).isEqualTo("example.fhir.uv.myig");
|
||||
|
||||
|
||||
lockThread.join();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
package org.hl7.fhir.utilities.npm;
|
||||
|
||||
import org.hl7.fhir.utilities.filesystem.ManagedFileAccess;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.io.*;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.channels.FileLock;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class LockfileUtility {
|
||||
public static void main(String[] args) {
|
||||
|
@ -24,14 +24,61 @@ public class LockfileUtility {
|
|||
|
||||
}
|
||||
|
||||
public static void waitForLockfileCreation(String path, String lockFileName) throws InterruptedException, IOException, TimeoutException {
|
||||
if (Files.exists(Paths.get(path, lockFileName))) {
|
||||
return;
|
||||
}
|
||||
try (WatchService watchService = FileSystems.getDefault().newWatchService()) {
|
||||
Path dir = Paths.get(path);
|
||||
dir.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
|
||||
|
||||
WatchKey key = watchService.poll(10, TimeUnit.SECONDS);
|
||||
if (key == null) {
|
||||
throw new TimeoutException("Timeout waiting for lock file creation: " + lockFileName);
|
||||
}
|
||||
for (WatchEvent<?> event : key.pollEvents()) {
|
||||
WatchEvent.Kind<?> kind = event.kind();
|
||||
if (kind == StandardWatchEventKinds.ENTRY_CREATE) {
|
||||
Path createdFile = (Path) event.context();
|
||||
if (createdFile.toString().equals(lockFileName)) {
|
||||
System.out.println("Lock file created: " + lockFileName);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
throw new TimeoutException("Timeout waiting for lock file creation: " + lockFileName);
|
||||
}
|
||||
}
|
||||
|
||||
public static Thread lockWaitAndDeleteInNewProcess(String path, String lockFileName, int seconds) {
|
||||
Thread t = new Thread(() -> {
|
||||
ProcessBuilder processBuilder = new ProcessBuilder("java", "-cp", "target/test-classes:.", "org.hl7.fhir.utilities.npm.LockfileUtility", path, lockFileName, Integer.toString(seconds));
|
||||
try {
|
||||
Process process = processBuilder.start();
|
||||
process.getErrorStream().transferTo(System.err);
|
||||
process.getInputStream().transferTo(System.out);
|
||||
process.waitFor();
|
||||
} catch (IOException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
return t;
|
||||
}
|
||||
|
||||
private static void lockWaitAndDelete(String path, String lockFileName, int seconds) throws InterruptedException, IOException {
|
||||
|
||||
File file = Paths.get(path,lockFileName).toFile();
|
||||
|
||||
try (FileChannel channel = new RandomAccessFile(file.getAbsolutePath(), "rw").getChannel()) {
|
||||
FileLock fileLock = channel.tryLock(0, Long.MAX_VALUE, false);
|
||||
if (fileLock != null) {
|
||||
System.out.println("File "+lockFileName+" is locked. Waiting for " + seconds + " seconds to release");
|
||||
final ByteBuffer buff = ByteBuffer.wrap("Hello world".getBytes(StandardCharsets.UTF_8));
|
||||
channel.write(buff);
|
||||
System.out.println("File "+lockFileName+" is locked. Waiting for " + seconds + " seconds to release. ");
|
||||
Thread.sleep(seconds * 1000L);
|
||||
fileLock.release();
|
||||
System.out.println(System.currentTimeMillis());
|
||||
System.out.println("File "+lockFileName+" is released.");
|
||||
|
||||
channel.close();
|
||||
|
|
Loading…
Reference in New Issue