Adding FileSystemPool.Listener support

This commit is contained in:
Joakim Erdfelt 2022-08-04 07:38:29 -05:00
parent ecd2cabfa2
commit 74f54da67e
No known key found for this signature in database
GPG Key ID: 2D0E1FB8FE4B68B4
1 changed files with 93 additions and 2 deletions

View File

@ -51,6 +51,36 @@ public class FileSystemPool implements Dumpable
private static final Logger LOG = LoggerFactory.getLogger(FileSystemPool.class);
public static final FileSystemPool INSTANCE = new FileSystemPool();
/**
* Listener for pool events
*/
public interface Listener
{
/**
* FileSystem URI is retained for the first time
* @param fsUri the filesystem URI
*/
void onRetain(URI fsUri);
/**
* FileSystem URI exists in the pool and its reference count is incremented
* @param fsUri the filesystem URI
*/
void onIncrement(URI fsUri);
/**
* FileSystem URI exists in the pool and its reference count is decremented
* @param fsUri the filesystem URI
*/
void onDecrement(URI fsUri);
/**
* FileSystem URI exists in the pool and reached no references and has been closed
* @param fsUri the filesystem URI
*/
void onClose(URI fsUri);
}
private static final Map<String, String> ENV_MULTIRELEASE_RUNTIME;
static
@ -64,6 +94,8 @@ public class FileSystemPool implements Dumpable
private final Map<URI, Bucket> pool = new HashMap<>();
private final AutoLock poolLock = new AutoLock();
private Listener listener;
private FileSystemPool()
{
}
@ -126,11 +158,15 @@ public class FileSystemPool implements Dumpable
LOG.debug("Ref counter reached 0, closing pooled FS {}", bucket);
IO.close(bucket.fileSystem);
pool.remove(fsUri);
if (listener != null)
listener.onClose(fsUri);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Decremented ref counter to {} for FS {}", count, bucket);
if (listener != null)
listener.onDecrement(fsUri);
}
}
catch (FileSystemNotFoundException fsnfe)
@ -204,14 +240,69 @@ public class FileSystemPool implements Dumpable
Bucket bucket = pool.get(fsUri);
if (bucket == null)
{
LOG.debug("Pooling new FS {}", fileSystem);
if (LOG.isDebugEnabled())
LOG.debug("Pooling new FS {}", fileSystem);
bucket = new Bucket(fsUri, fileSystem, mount);
pool.put(fsUri, bucket);
if (listener != null)
listener.onRetain(fsUri);
}
else
{
int count = bucket.counter.incrementAndGet();
LOG.debug("Incremented ref counter to {} for FS {}", count, fileSystem);
if (LOG.isDebugEnabled())
LOG.debug("Incremented ref counter to {} for FS {}", count, fileSystem);
if (listener != null)
listener.onIncrement(fsUri);
}
}
/**
* Set a listener on the FileSystemPool to monitor for pool events.
*
* @param listener the listener for pool events
*/
public void setListener(Listener listener)
{
try (AutoLock ignore = poolLock.lock())
{
this.listener = listener;
}
}
/**
* Show a StackTrace
*/
public static class StackLoggingListener implements Listener
{
private static final Logger LOG = LoggerFactory.getLogger(StackLoggingListener.class);
@Override
public void onRetain(URI uri)
{
if (LOG.isDebugEnabled())
LOG.debug("Retain {}", uri, new Throwable("Retain"));
}
@Override
public void onIncrement(URI uri)
{
if (LOG.isDebugEnabled())
LOG.debug("Increment {}", uri, new Throwable("Increment"));
}
@Override
public void onDecrement(URI uri)
{
if (LOG.isDebugEnabled())
LOG.debug("Decrement {}", uri, new Throwable("Decrement"));
}
@Override
public void onClose(URI uri)
{
if (LOG.isDebugEnabled())
LOG.debug("Close {}", uri, new Throwable("Close"));
}
}