469241 Support more of existing Scanner behaviour for PathWatcher

This commit is contained in:
Jan Bartel 2015-06-03 18:59:43 +10:00
parent 69bf5ab46c
commit 7c375c2bce
5 changed files with 533 additions and 249 deletions

View File

@ -329,17 +329,16 @@ public class PropertyUserStore extends AbstractLifeCycle implements PathWatcher.
{ {
super.doStart(); super.doStart();
loadUsers();
if ( isHotReload() && (_configPath != null) ) if ( isHotReload() && (_configPath != null) )
{ {
this.pathWatcher = new PathWatcher(); this.pathWatcher = new PathWatcher();
this.pathWatcher.addFileWatch(_configPath); this.pathWatcher.addFileWatch(_configPath);
this.pathWatcher.addListener(this); this.pathWatcher.addListener(this);
this.pathWatcher.setNotifyExistingOnStart(false);
this.pathWatcher.start(); this.pathWatcher.start();
} }
else
{
loadUsers();
}
} }
@Override @Override

View File

@ -149,8 +149,6 @@ public class PropertyUserStoreTest
store.start(); store.start();
Thread.sleep(2000);
userCount.assertThatCount(is(3)); userCount.assertThatCount(is(3));
addAdditionalUser(usersFile,"skip: skip, roleA\n"); addAdditionalUser(usersFile,"skip: skip, roleA\n");
@ -181,8 +179,6 @@ public class PropertyUserStoreTest
store.start(); store.start();
Thread.sleep(2000);
userCount.assertThatCount(is(4)); userCount.assertThatCount(is(4));
// rewrite file with original 3 users // rewrite file with original 3 users

View File

@ -39,9 +39,11 @@ import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.EventListener;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
@ -252,7 +254,13 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
Config subconfig = new Config(dir); Config subconfig = new Config(dir);
subconfig.includes = this.includes; subconfig.includes = this.includes;
subconfig.excludes = this.excludes; subconfig.excludes = this.excludes;
subconfig.recurseDepth = this.recurseDepth - 1; if (dir == this.dir)
subconfig.recurseDepth = this.recurseDepth; // TODO shouldn't really do a subconfig for this
else
{
subconfig.recurseDepth = this.recurseDepth - (dir.getNameCount() - this.dir.getNameCount());
}
return subconfig; return subconfig;
} }
@ -261,6 +269,11 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
return recurseDepth; return recurseDepth;
} }
public Path getPath ()
{
return this.dir;
}
private boolean hasMatch(Path path, List<PathMatcher> matchers) private boolean hasMatch(Path path, List<PathMatcher> matchers)
{ {
for (PathMatcher matcher : matchers) for (PathMatcher matcher : matchers)
@ -356,7 +369,7 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
*/ */
public boolean shouldRecurseDirectory(Path child) public boolean shouldRecurseDirectory(Path child)
{ {
if (!child.startsWith(child)) if (!child.startsWith(dir))
{ {
// not part of parent? don't recurse // not part of parent? don't recurse
return false; return false;
@ -445,28 +458,137 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
} }
} }
public static class DepthLimitedFileVisitor extends SimpleFileVisitor<Path>
{
private Config base;
private PathWatcher watcher;
public DepthLimitedFileVisitor (PathWatcher watcher, Config base)
{
this.base = base;
this.watcher = watcher;
}
/*
* 2 situations:
*
* 1. a subtree exists at the time a dir to watch is added (eg watching /tmp/xxx and it contains aaa/)
* - will start with /tmp/xxx for which we want to register with the poller
* - want to visit each child
* - if child is file, gen add event
* - if child is dir, gen add event but ONLY register it if inside depth limit and ONLY continue visit of child if inside depth limit
* 2. a subtree is added inside a watched dir (watching /tmp/xxx, add aaa/ to xxx/)
* - will start with /tmp/xxx/aaa
* - gen add event but ONLY register it if inside depth limit and ONLY continue visit of children if inside depth limit
*
*/
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException
{
//In a directory:
// 1. the dir is the base directory
// - register it with the poll mechanism
// - generate pending add event (iff notifiable and matches patterns)
// - continue the visit (sibling dirs, sibling files)
// 2. the dir is a subdir at some depth in the basedir's tree
// - if the level of the subdir less or equal to base's limit
// - register it wih the poll mechanism
// - generate pending add event (iff notifiable and matches patterns)
// - else stop visiting this dir
// if (base.getPath().equals(dir) || base.shouldRecurseDirectory(dir))
// {
if (!base.isExcluded(dir))
{
if (base.isIncluded(dir))
{
if (watcher.isNotifiable())
{
// Directory is specifically included in PathMatcher, then
// it should be notified as such to interested listeners
PathWatchEvent event = new PathWatchEvent(dir,PathWatchEventType.ADDED);
if (LOG.isDebugEnabled())
{
LOG.debug("Pending {}",event);
}
watcher.addToPendingList(dir, event);
}
}
if ((base.getPath().equals(dir) && base.getRecurseDepth() >= 0) || base.shouldRecurseDirectory(dir))
{
watcher.register(dir,base);
}
}
if ((base.getPath().equals(dir)&& base.getRecurseDepth() >= 0) || base.shouldRecurseDirectory(dir))
{
return FileVisitResult.CONTINUE;
}
//}
// else
//{
else
{
return FileVisitResult.SKIP_SUBTREE;
}
// }
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException
{
// In a file:
// - register with poll mechanism
// - generate pending add event (iff notifiable and matches patterns)
if (base.matches(file) && watcher.isNotifiable())
{
PathWatchEvent event = new PathWatchEvent(file,PathWatchEventType.ADDED);
if (LOG.isDebugEnabled())
{
LOG.debug("Pending {}",event);
}
watcher.addToPendingList(file, event);
}
return FileVisitResult.CONTINUE;
}
}
/** /**
* Listener for path change events * Listener for path change events
*/ */
public static interface Listener public static interface Listener extends EventListener
{ {
void onPathWatchEvent(PathWatchEvent event); void onPathWatchEvent(PathWatchEvent event);
} }
public static interface EventListListener extends EventListener
{
void onPathWatchEvents(List<PathWatchEvent> events);
}
public static class PathWatchEvent public static class PathWatchEvent
{ {
private final Path path; private final Path path;
private final PathWatchEventType type; private final PathWatchEventType type;
private int count; private int count = 0;
private long timestamp;
private long lastFileSize = -1;
public PathWatchEvent(Path path, PathWatchEventType type) public PathWatchEvent(Path path, PathWatchEventType type)
{ {
this.path = path; this.path = path;
this.count = 0; this.count = 1;
this.type = type; this.type = type;
this.timestamp = System.currentTimeMillis();
} }
public PathWatchEvent(Path path, WatchEvent<Path> event) public PathWatchEvent(Path path, WatchEvent<Path> event)
@ -489,7 +611,6 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
{ {
this.type = PathWatchEventType.UNKNOWN; this.type = PathWatchEventType.UNKNOWN;
} }
this.timestamp = System.currentTimeMillis();
} }
@Override @Override
@ -526,6 +647,112 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
return true; return true;
} }
public Path getPath()
{
return path;
}
public PathWatchEventType getType()
{
return type;
}
public void incrementCount(int num)
{
count += num;
}
public int getCount()
{
return count;
}
@Override
public int hashCode()
{
final int prime = 31;
int result = 1;
result = (prime * result) + ((path == null)?0:path.hashCode());
result = (prime * result) + ((type == null)?0:type.hashCode());
return result;
}
@Override
public String toString()
{
return String.format("PathWatchEvent[%s|%s]",type,path);
}
}
public static class PathPendingEvents
{
private Path _path;
private List<PathWatchEvent> _events;
private long _timestamp;
private long _lastFileSize = -1;
public PathPendingEvents (Path path)
{
_path = path;
}
public PathPendingEvents (Path path, PathWatchEvent event)
{
this (path);
addEvent(event);
}
public void addEvent (PathWatchEvent event)
{
long now = System.currentTimeMillis();
_timestamp = now;
if (_events == null)
{
_events = new ArrayList<PathWatchEvent>();
_events.add(event);
}
else
{
//Check if the same type of event is already present, in which case we
//can increment its counter. Otherwise, add it
PathWatchEvent existingType = null;
for (PathWatchEvent e:_events)
{
if (e.getType() == event.getType())
{
existingType = e;
break;
}
}
if (existingType == null)
{
_events.add(event);
}
else
{
existingType.incrementCount(event.getCount());
}
}
}
public List<PathWatchEvent> getEvents()
{
return _events;
}
public long getTimestamp()
{
return _timestamp;
}
/** /**
* Check to see if the file referenced by this Event is quiet. * Check to see if the file referenced by this Event is quiet.
* <p> * <p>
@ -539,79 +766,27 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
* the unit of time for the expired check * the unit of time for the expired check
* @return true if expired, false if not * @return true if expired, false if not
*/ */
public boolean isQuiet(long expiredDuration, TimeUnit expiredUnit) public boolean isQuiet(long now, long expiredDuration, TimeUnit expiredUnit)
{ {
long now = System.currentTimeMillis();
long pastdue = this.timestamp + expiredUnit.toMillis(expiredDuration);
this.timestamp = now;
try long pastdue = _timestamp + expiredUnit.toMillis(expiredDuration);
{ _timestamp = now;
long fileSize = Files.size(path);
boolean fileSizeChanged = (this.lastFileSize != fileSize);
this.lastFileSize = fileSize;
if ((now > pastdue) && !fileSizeChanged) long fileSize = _path.toFile().length(); //File.length() returns 0 for non existant files
boolean fileSizeChanged = (_lastFileSize != fileSize);
_lastFileSize = fileSize;
if ((now > pastdue) && (!fileSizeChanged /*|| fileSize == 0*/))
{ {
// Quiet period timestamp has expired, and file size hasn't changed. // Quiet period timestamp has expired, and file size hasn't changed, or the file
// has been deleted.
// Consider this a quiet event now. // Consider this a quiet event now.
return true; return true;
} }
}
catch (IOException e)
{
// Currently we consider this a bad event.
// However, should we permanently consider this a bad event?
// The file size is the only trigger for this.
// If the filesystem prevents access to the file during updates
// (Like Windows), then this file size indicator has to be tried
// later.
LOG.debug("Cannot read file size: " + path,e);
}
return false; return false;
} }
public int getCount()
{
return count;
}
public Path getPath()
{
return path;
}
public long getTimestamp()
{
return timestamp;
}
public PathWatchEventType getType()
{
return type;
}
@Override
public int hashCode()
{
final int prime = 31;
int result = 1;
result = (prime * result) + ((path == null)?0:path.hashCode());
result = (prime * result) + ((type == null)?0:type.hashCode());
return result;
}
public void incrementCount(int num)
{
this.count += num;
}
@Override
public String toString()
{
return String.format("PathWatchEvent[%s|%s,count=%d]",type,path,count);
}
} }
public static enum PathWatchEventType public static enum PathWatchEventType
@ -653,15 +828,23 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
private final boolean nativeWatchService; private final boolean nativeWatchService;
private Map<WatchKey, Config> keys = new HashMap<>(); private Map<WatchKey, Config> keys = new HashMap<>();
private List<Listener> listeners = new ArrayList<>(); private List<EventListener> listeners = new ArrayList<>();
private List<PathWatchEvent> pendingAddEvents = new ArrayList<>();
/** /**
* Update Quiet Time - set to 1000 ms as default (a lower value in Windows is not supported) * Update Quiet Time - set to 1000 ms as default (a lower value in Windows is not supported)
*/ */
private long updateQuietTimeDuration = 1000; private long updateQuietTimeDuration = 1000;
private TimeUnit updateQuietTimeUnit = TimeUnit.MILLISECONDS; private TimeUnit updateQuietTimeUnit = TimeUnit.MILLISECONDS;
private Thread thread; private Thread thread;
private boolean _notifyExistingOnStart = true;
private Map<Path, PathPendingEvents> pendingEvents = new LinkedHashMap<>();
/**
* Construct new PathWatcher
* @throws IOException
*/
public PathWatcher() throws IOException public PathWatcher() throws IOException
{ {
this.watcher = FileSystems.getDefault().newWatchService(); this.watcher = FileSystems.getDefault().newWatchService();
@ -697,10 +880,10 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
} }
/** /**
* Add a directory watch configuration to the the PathWatcher. * Add a directory to watch with customized watch parameters.
* *
* @param baseDir * @param baseDir
* the base directory configuration to watch * the dir to watch with its customized config
* @throws IOException * @throws IOException
* if unable to setup the directory watch * if unable to setup the directory watch
*/ */
@ -710,67 +893,17 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
{ {
LOG.debug("Watching directory {}",baseDir); LOG.debug("Watching directory {}",baseDir);
} }
Files.walkFileTree(baseDir.dir,new SimpleFileVisitor<Path>() Files.walkFileTree(baseDir.getPath(), new DepthLimitedFileVisitor(this, baseDir));
{
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException
{
FileVisitResult result = FileVisitResult.SKIP_SUBTREE;
if (LOG.isDebugEnabled())
{
LOG.debug("preVisitDirectory: {}",dir);
} }
// Is directory not specifically excluded?
if (!baseDir.isExcluded(dir))
{
if (baseDir.isIncluded(dir))
{
// Directory is specifically included in PathMatcher, then
// it should be notified as such to interested listeners
PathWatchEvent event = new PathWatchEvent(dir,PathWatchEventType.ADDED);
if (LOG.isDebugEnabled())
{
LOG.debug("Pending {}",event);
}
pendingAddEvents.add(event);
}
register(dir,baseDir);
// Recurse Directory, based on configured depth
if (baseDir.shouldRecurseDirectory(dir) || baseDir.dir.equals(dir))
{
result = FileVisitResult.CONTINUE;
}
}
if (LOG.isDebugEnabled())
{
LOG.debug("preVisitDirectory: result {}",result);
}
return result;
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException
{
if (baseDir.matches(file))
{
PathWatchEvent event = new PathWatchEvent(file,PathWatchEventType.ADDED);
if (LOG.isDebugEnabled())
{
LOG.debug("Pending {}",event);
}
pendingAddEvents.add(event);
}
return FileVisitResult.CONTINUE;
}
});
}
/**
* Add a file or directory to watch for changes.
*
* @param file
* @throws IOException
*/
public void addFileWatch(final Path file) throws IOException public void addFileWatch(final Path file) throws IOException
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -790,11 +923,21 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
addDirectoryWatch(config); addDirectoryWatch(config);
} }
public void addListener(Listener listener) /**
* Add a listener for changes the watcher notices.
*
* @param listener change listener
*/
public void addListener(EventListener listener)
{ {
listeners.add(listener); listeners.add(listener);
} }
/**
* Append some info on the paths that we are watching.
*
* @param s
*/
private void appendConfigId(StringBuilder s) private void appendConfigId(StringBuilder s)
{ {
List<Path> dirs = new ArrayList<>(); List<Path> dirs = new ArrayList<>();
@ -822,6 +965,9 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
s.append("]"); s.append("]");
} }
/**
* @see org.eclipse.jetty.util.component.AbstractLifeCycle#doStart()
*/
@Override @Override
protected void doStart() throws Exception protected void doStart() throws Exception
{ {
@ -835,6 +981,9 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
super.doStart(); super.doStart();
} }
/**
* @see org.eclipse.jetty.util.component.AbstractLifeCycle#doStop()
*/
@Override @Override
protected void doStop() throws Exception protected void doStop() throws Exception
{ {
@ -842,23 +991,71 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
super.doStop(); super.doStop();
} }
public Iterator<Listener> getListeners() /**
* Check to see if the watcher is in a state where it should generate
* watch events to the listeners. Used to determine if watcher should generate
* events for existing files and dirs on startup.
*
* @return true if the watcher should generate events to the listeners.
*/
protected boolean isNotifiable ()
{
return (isStarted() || (!isStarted() && isNotifyExistingOnStart()));
}
/**
* Get an iterator over the listeners.
*
* @return iterator over the listeners.
*/
public Iterator<EventListener> getListeners()
{ {
return listeners.iterator(); return listeners.iterator();
} }
/**
* Change the quiet time.
*
* @return the quiet time in millis
*/
public long getUpdateQuietTimeMillis() public long getUpdateQuietTimeMillis()
{ {
return TimeUnit.MILLISECONDS.convert(updateQuietTimeDuration,updateQuietTimeUnit); return TimeUnit.MILLISECONDS.convert(updateQuietTimeDuration,updateQuietTimeUnit);
} }
protected void notifyOnPathWatchEvent(PathWatchEvent event)
/**
* Generate events to the listeners.
*
* @param events
*/
protected void notifyOnPathWatchEvents (List<PathWatchEvent> events)
{ {
for (Listener listener : listeners) if (events == null || events.isEmpty())
return;
for (EventListener listener : listeners)
{
if (listener instanceof EventListListener)
{ {
try try
{ {
listener.onPathWatchEvent(event); ((EventListListener)listener).onPathWatchEvents(events);
}
catch (Throwable t)
{
LOG.warn(t);
}
}
else
{
Listener l = (Listener)listener;
for (PathWatchEvent event:events)
{
try
{
l.onPathWatchEvent(event);
} }
catch (Throwable t) catch (Throwable t)
{ {
@ -866,7 +1063,17 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
} }
} }
} }
}
}
/**
* Register a dir or a file with the WatchService.
*
* @param dir
* @param root
* @throws IOException
*/
protected void register(Path dir, Config root) throws IOException protected void register(Path dir, Config root) throws IOException
{ {
LOG.debug("Registering watch on {}",dir); LOG.debug("Registering watch on {}",dir);
@ -881,15 +1088,40 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
} }
} }
/**
* Delete a listener
* @param listener
* @return
*/
public boolean removeListener(Listener listener) public boolean removeListener(Listener listener)
{ {
return listeners.remove(listener); return listeners.remove(listener);
} }
/**
* Forever loop.
*
* Wait for the WatchService to report some filesystem events for the
* watched paths.
*
* When an event for a path first occurs, it is subjected to a quiet time.
* Subsequent events that arrive for the same path during this quiet time are
* accumulated and the timer reset. Only when the quiet time has expired are
* the accumulated events sent. MODIFY events are handled slightly differently -
* multiple MODIFY events arriving within a quiet time are coalesced into a
* single MODIFY event. Both the accumulation of events and coalescing of MODIFY
* events reduce the number and frequency of event reporting for "noisy" files (ie
* those that are undergoing rapid change).
*
* @see java.lang.Runnable#run()
*/
@Override @Override
public void run() public void run()
{ {
Map<Path, PathWatchEvent> pendingUpdateEvents = new HashMap<>();
List<PathWatchEvent> notifiableEvents = new ArrayList<PathWatchEvent>();
// Start the java.nio watching // Start the java.nio watching
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -900,51 +1132,46 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
while (true) while (true)
{ {
WatchKey key = null; WatchKey key = null;
try try
{ {
// Process old events (from addDirectoryWatch()) //If no pending events, wait forever for new events
if (!pendingAddEvents.isEmpty()) if (pendingEvents.isEmpty())
{
for (PathWatchEvent event : pendingAddEvents)
{
notifyOnPathWatchEvent(event);
}
pendingAddEvents.clear();
}
// Process new events
if (pendingUpdateEvents.isEmpty())
{ {
if (NOISY_LOG.isDebugEnabled()) if (NOISY_LOG.isDebugEnabled())
{
NOISY_LOG.debug("Waiting for take()"); NOISY_LOG.debug("Waiting for take()");
}
// wait for any event
key = watcher.take(); key = watcher.take();
} }
else else
{ {
//There are existing events that might be ready to go,
//only wait as long as the quiet time for any new events
if (NOISY_LOG.isDebugEnabled()) if (NOISY_LOG.isDebugEnabled())
{
NOISY_LOG.debug("Waiting for poll({}, {})",updateQuietTimeDuration,updateQuietTimeUnit); NOISY_LOG.debug("Waiting for poll({}, {})",updateQuietTimeDuration,updateQuietTimeUnit);
}
key = watcher.poll(updateQuietTimeDuration,updateQuietTimeUnit); key = watcher.poll(updateQuietTimeDuration,updateQuietTimeUnit);
//If no new events its safe to process the pendings
if (key == null) if (key == null)
{ {
long now = System.currentTimeMillis();
// no new event encountered. // no new event encountered.
for (Path path : new HashSet<Path>(pendingUpdateEvents.keySet())) for (Path path : new HashSet<Path>(pendingEvents.keySet()))
{ {
PathWatchEvent pending = pendingUpdateEvents.get(path); PathPendingEvents pending = pendingEvents.get(path);
if (pending.isQuiet(updateQuietTimeDuration,updateQuietTimeUnit)) if (pending.isQuiet(now, updateQuietTimeDuration,updateQuietTimeUnit))
{ {
// it is expired //No fresh events received during quiet time for this path,
// notify that update is complete //so generate the events that were pent up
notifyOnPathWatchEvent(pending); for (PathWatchEvent p:pending.getEvents())
{
notifiableEvents.add(p);
}
// remove from pending list // remove from pending list
pendingUpdateEvents.remove(path); pendingEvents.remove(path);
} }
} }
continue; // loop again
} }
} }
} }
@ -966,6 +1193,10 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
return; return;
} }
//If there was some new events to process
if (key != null)
{
Config config = keys.get(key); Config config = keys.get(key);
if (config == null) if (config == null)
{ {
@ -1001,47 +1232,21 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
} }
else if (config.matches(child)) else if (config.matches(child))
{ {
notifyOnPathWatchEvent(new PathWatchEvent(child,ev)); addToPendingList(child, new PathWatchEvent(child,ev));
} }
} }
else if (config.matches(child)) else if (config.matches(child))
{ {
if (kind == ENTRY_MODIFY) addToPendingList(child, new PathWatchEvent(child,ev));
{
// handle modify events with a quiet time before they
// are notified to the listeners
PathWatchEvent pending = pendingUpdateEvents.get(child);
if (pending == null)
{
// new pending update
pendingUpdateEvents.put(child,new PathWatchEvent(child,ev));
}
else
{
// see if pending is expired
if (pending.isQuiet(updateQuietTimeDuration,updateQuietTimeUnit))
{
// it is expired, notify that update is complete
notifyOnPathWatchEvent(pending);
// remove from pending list
pendingUpdateEvents.remove(child);
}
else
{
// update the count (useful for debugging)
pending.incrementCount(ev.count());
}
}
}
else
{
notifyOnPathWatchEvent(new PathWatchEvent(child,ev));
} }
} }
} }
if (!key.reset()) //Send any notifications generated this pass
notifyOnPathWatchEvents(notifiableEvents);
notifiableEvents.clear();
if (key != null && !key.reset())
{ {
keys.remove(key); keys.remove(key);
if (keys.isEmpty()) if (keys.isEmpty())
@ -1052,6 +1257,54 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
} }
} }
/**
* Add an event reported by the WatchService to list of pending events
* that will be sent after their quiet time has expired.
*
* @param path
* @param event
*/
public void addToPendingList (Path path, PathWatchEvent event)
{
PathPendingEvents pending = pendingEvents.get(path);
//Are there already pending events for this path?
if (pending == null)
{
//No existing pending events, create pending list
pendingEvents.put(path,new PathPendingEvents(path, event));
}
else
{
//There are already some events pending for this path
pending.addEvent(event);
}
}
/**
* Whether or not to issue notifications for directories and files that
* already exist when the watcher starts.
*
* @param notify
*/
public void setNotifyExistingOnStart (boolean notify)
{
_notifyExistingOnStart = notify;
}
public boolean isNotifyExistingOnStart ()
{
return _notifyExistingOnStart;
}
/**
* Set the quiet time.
*
* @param duration
* @param unit
*/
public void setUpdateQuietTime(long duration, TimeUnit unit) public void setUpdateQuietTime(long duration, TimeUnit unit)
{ {
long desiredMillis = unit.toMillis(duration); long desiredMillis = unit.toMillis(duration);

View File

@ -61,7 +61,26 @@ public class PathWatcherDemo implements PathWatcher.Listener
public void run(List<Path> paths) throws Exception public void run(List<Path> paths) throws Exception
{ {
PathWatcher watcher = new PathWatcher(); PathWatcher watcher = new PathWatcher();
watcher.addListener(new PathWatcherDemo()); //watcher.addListener(new PathWatcherDemo());
watcher.addListener (new PathWatcher.EventListListener(){
@Override
public void onPathWatchEvents(List<PathWatchEvent> events)
{
if (events == null)
LOG.warn("Null events received");
if (events.isEmpty())
LOG.warn("Empty events received");
LOG.info("Bulk notification received");
for (PathWatchEvent e:events)
onPathWatchEvent(e);
}
});
//watcher.setNotifyExistingOnStart(false);
List<String> excludes = new ArrayList<>(); List<String> excludes = new ArrayList<>();
excludes.add("glob:*.bak"); // ignore backup files excludes.add("glob:*.bak"); // ignore backup files
@ -74,6 +93,7 @@ public class PathWatcherDemo implements PathWatcher.Listener
PathWatcher.Config config = new PathWatcher.Config(path); PathWatcher.Config config = new PathWatcher.Config(path);
config.addExcludeHidden(); config.addExcludeHidden();
config.addExcludes(excludes); config.addExcludes(excludes);
config.setRecurseDepth(4);
watcher.addDirectoryWatch(config); watcher.addDirectoryWatch(config);
} }
else else

View File

@ -60,7 +60,7 @@ public class PathWatcherTest
*/ */
public Map<String, List<PathWatchEventType>> events = new HashMap<>(); public Map<String, List<PathWatchEventType>> events = new HashMap<>();
public CountDownLatch finishedLatch = new CountDownLatch(1); public CountDownLatch finishedLatch;
private PathWatchEventType triggerType; private PathWatchEventType triggerType;
private Path triggerPath; private Path triggerPath;
@ -69,19 +69,28 @@ public class PathWatcherTest
this.baseDir = baseDir; this.baseDir = baseDir;
} }
@Override @Override
public void onPathWatchEvent(PathWatchEvent event) public void onPathWatchEvent(PathWatchEvent event)
{ {
synchronized (events) synchronized (events)
{ {
//if triggered by path
if (triggerPath != null) if (triggerPath != null)
{ {
if (triggerPath.equals(event.getPath()) && (event.getType() == triggerType)) if (triggerPath.equals(event.getPath()) && (event.getType() == triggerType))
{ {
LOG.debug("Encountered finish trigger: {} on {}",event.getType(),event.getPath()); LOG.debug("Encountered finish trigger: {} on {}",event.getType(),event.getPath());
finishedLatch.countDown(); finishedLatch.countDown();
} }
} }
else if (finishedLatch != null)
{
finishedLatch.countDown();
}
Path relativePath = this.baseDir.relativize(event.getPath()); Path relativePath = this.baseDir.relativize(event.getPath());
String key = relativePath.toString().replace(File.separatorChar,'/'); String key = relativePath.toString().replace(File.separatorChar,'/');
@ -151,9 +160,15 @@ public class PathWatcherTest
{ {
this.triggerPath = triggerPath; this.triggerPath = triggerPath;
this.triggerType = triggerType; this.triggerType = triggerType;
this.finishedLatch = new CountDownLatch(1);
LOG.debug("Setting finish trigger {} for path {}",triggerType,triggerPath); LOG.debug("Setting finish trigger {} for path {}",triggerType,triggerPath);
} }
public void setFinishTrigger (int count)
{
finishedLatch = new CountDownLatch(count);
}
/** /**
* Await the countdown latch on the finish trigger * Await the countdown latch on the finish trigger
* *
@ -167,9 +182,9 @@ public class PathWatcherTest
*/ */
public void awaitFinish(PathWatcher pathWatcher) throws IOException, InterruptedException public void awaitFinish(PathWatcher pathWatcher) throws IOException, InterruptedException
{ {
assertThat("Trigger Path must be set",triggerPath,notNullValue()); //assertThat("Trigger Path must be set",triggerPath,notNullValue());
assertThat("Trigger Type must be set",triggerType,notNullValue()); //assertThat("Trigger Type must be set",triggerType,notNullValue());
double multiplier = 8.0; double multiplier = 25.0;
long awaitMillis = (long)((double)pathWatcher.getUpdateQuietTimeMillis() * multiplier); long awaitMillis = (long)((double)pathWatcher.getUpdateQuietTimeMillis() * multiplier);
LOG.debug("Waiting for finish ({} ms)",awaitMillis); LOG.debug("Waiting for finish ({} ms)",awaitMillis);
assertThat("Timed Out (" + awaitMillis + "ms) waiting for capture to finish",finishedLatch.await(awaitMillis,TimeUnit.MILLISECONDS),is(true)); assertThat("Timed Out (" + awaitMillis + "ms) waiting for capture to finish",finishedLatch.await(awaitMillis,TimeUnit.MILLISECONDS),is(true));
@ -251,11 +266,11 @@ public class PathWatcherTest
*/ */
private static void awaitQuietTime(PathWatcher pathWatcher) throws InterruptedException private static void awaitQuietTime(PathWatcher pathWatcher) throws InterruptedException
{ {
double multiplier = 2.0; double multiplier = 5.0;
if (OS.IS_WINDOWS) if (OS.IS_WINDOWS)
{ {
// Microsoft Windows filesystem is too slow for a lower multiplier // Microsoft Windows filesystem is too slow for a lower multiplier
multiplier = 3.0; multiplier = 6.0;
} }
TimeUnit.MILLISECONDS.sleep((long)((double)pathWatcher.getUpdateQuietTimeMillis() * multiplier)); TimeUnit.MILLISECONDS.sleep((long)((double)pathWatcher.getUpdateQuietTimeMillis() * multiplier));
} }
@ -390,6 +405,7 @@ public class PathWatcherTest
// Add listener // Add listener
PathWatchEventCapture capture = new PathWatchEventCapture(dir); PathWatchEventCapture capture = new PathWatchEventCapture(dir);
capture.setFinishTrigger(5);
pathWatcher.addListener(capture); pathWatcher.addListener(capture);
// Add test dir configuration // Add test dir configuration
@ -409,7 +425,7 @@ public class PathWatcherTest
// Update web.xml // Update web.xml
Path webFile = dir.resolve("bar/WEB-INF/web.xml"); Path webFile = dir.resolve("bar/WEB-INF/web.xml");
capture.setFinishTrigger(webFile,MODIFIED); //capture.setFinishTrigger(webFile,MODIFIED);
updateFile(webFile,"Hello Update"); updateFile(webFile,"Hello Update");
// Delete war // Delete war