469241 Make PathWatcher stoppable and restartable

This commit is contained in:
Jan Bartel 2015-06-04 17:09:26 +10:00
parent 6c2780b9cf
commit 30e14fd256
4 changed files with 253 additions and 82 deletions

View File

@ -333,7 +333,7 @@ public class PropertyUserStore extends AbstractLifeCycle implements PathWatcher.
if ( isHotReload() && (_configPath != null) )
{
this.pathWatcher = new PathWatcher();
this.pathWatcher.addFileWatch(_configPath);
this.pathWatcher.watch(_configPath);
this.pathWatcher.addListener(this);
this.pathWatcher.setNotifyExistingOnStart(false);
this.pathWatcher.start();
@ -363,6 +363,7 @@ public class PropertyUserStore extends AbstractLifeCycle implements PathWatcher.
super.doStop();
if (this.pathWatcher != null)
this.pathWatcher.stop();
this.pathWatcher = null;
}
/**

View File

@ -359,7 +359,7 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
/**
* Set the recurse depth for the directory scanning.
* <p>
* 0 indicates no recursion, 1 is only one directory deep, and so on.
* -999 indicates arbitrarily deep recursion, 0 indicates no recursion, 1 is only one directory deep, and so on.
*
* @param depth
* the number of directories deep to recurse
@ -368,6 +368,8 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
{
this.recurseDepth = depth;
}
/**
* Determine if the provided child directory should be recursed into based on the configured {@link #setRecurseDepth(int)}
@ -543,8 +545,8 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
// - a child dir and its depth is within the limits
if ((base.getPath().equals(dir)&& (base.isRecurseDepthUnlimited() || base.getRecurseDepth() >= 0)) || base.shouldRecurseDirectory(dir))
return FileVisitResult.CONTINUE;
else
return FileVisitResult.SKIP_SUBTREE;
else
return FileVisitResult.SKIP_SUBTREE;
}
@Override
@ -577,11 +579,21 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
void onPathWatchEvent(PathWatchEvent event);
}
/**
* EventListListener
*
* Listener that reports accumulated events in one shot
*/
public static interface EventListListener extends EventListener
{
void onPathWatchEvents(List<PathWatchEvent> events);
}
/**
* PathWatchEvent
*
* Represents a file event. Reported to registered listeners.
*/
public static class PathWatchEvent
{
private final Path path;
@ -590,6 +602,10 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
/**
* @param path
* @param type
*/
public PathWatchEvent(Path path, PathWatchEventType type)
{
this.path = path;
@ -598,6 +614,10 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
}
/**
* @param path
* @param event
*/
public PathWatchEvent(Path path, WatchEvent<Path> event)
{
this.path = path;
@ -620,6 +640,9 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
}
}
/**
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object obj)
{
@ -656,28 +679,43 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
/**
* @return
*/
public Path getPath()
{
return path;
}
/**
* @return
*/
public PathWatchEventType getType()
{
return type;
}
/**
* @param num
*/
public void incrementCount(int num)
{
count += num;
}
/**
* @return
*/
public int getCount()
{
return count;
}
/**
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode()
{
@ -688,6 +726,9 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
return result;
}
/**
* @see java.lang.Object#toString()
*/
@Override
public String toString()
{
@ -695,6 +736,15 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
}
}
/**
* PathPendingEvents
*
* For a given path, a list of events that are awaiting the
* quiet time. The list is in the order that the event were
* received from the WatchService
*/
public static class PathPendingEvents
{
private Path _path;
@ -796,6 +846,11 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
}
/**
* PathWatchEventType
*
* Type of an event
*/
public static enum PathWatchEventType
{
ADDED, DELETED, MODIFIED, UNKNOWN;
@ -830,12 +885,14 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
}
private static final WatchEvent.Kind<?> WATCH_EVENT_KINDS[] = { ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY };
private final WatchService watcher;
private final WatchEvent.Modifier watchModifiers[];
private final boolean nativeWatchService;
private WatchService watchService;
private WatchEvent.Modifier watchModifiers[];
private boolean nativeWatchService;
private Map<WatchKey, Config> keys = new HashMap<>();
private List<EventListener> listeners = new ArrayList<>();
private List<Config> configs = new ArrayList<>();
/**
* Update Quiet Time - set to 1000 ms as default (a lower value in Windows is not supported)
@ -854,69 +911,22 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
*/
public PathWatcher() throws IOException
{
this.watcher = FileSystems.getDefault().newWatchService();
WatchEvent.Modifier modifiers[] = null;
boolean nativeService = true;
// Try to determine native behavior
// See http://stackoverflow.com/questions/9588737/is-java-7-watchservice-slow-for-anyone-else
try
{
ClassLoader cl = Thread.currentThread().getContextClassLoader();
Class<?> pollingWatchServiceClass = Class.forName("sun.nio.fs.PollingWatchService",false,cl);
if (pollingWatchServiceClass.isAssignableFrom(this.watcher.getClass()))
{
nativeService = false;
LOG.info("Using Non-Native Java {}",pollingWatchServiceClass.getName());
Class<?> c = Class.forName("com.sun.nio.file.SensitivityWatchEventModifier");
Field f = c.getField("HIGH");
modifiers = new WatchEvent.Modifier[]
{
(WatchEvent.Modifier)f.get(c)
};
}
}
catch (Throwable t)
{
// Unknown JVM environment, assuming native.
LOG.ignore(t);
}
this.watchModifiers = modifiers;
this.nativeWatchService = nativeService;
}
/**
* Add a directory to watch with customized watch parameters.
*
* @param baseDir
* the dir to watch with its customized config
* @throws IOException
* if unable to setup the directory watch
*/
public void addDirectoryWatch(final Config baseDir) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("Watching directory {}",baseDir);
}
Files.walkFileTree(baseDir.getPath(), new DepthLimitedFileVisitor(this, baseDir));
}
/**
* Add a file or directory to watch for changes.
* Request watch on a the given path (either file or dir)
* using all Config defaults. In the case of a dir,
* the default is not to recurse into subdirs for watching.
*
* @param file
* @throws IOException
*/
public void addFileWatch(final Path file) throws IOException
public void watch (final Path file)
{
if (LOG.isDebugEnabled())
{
LOG.debug("Watching file {}",file);
}
//Make a config for the dir above it and
//include a match only for the given path
//using all defaults for the configuration
Path abs = file;
if (!abs.isAbsolute())
{
@ -927,9 +937,43 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
config.addIncludeGlobRelative("");
// the include for the file
config.addIncludeGlobRelative(file.getFileName().toString());
addDirectoryWatch(config);
watch(config);
}
/**
* Request watch on a path with custom Config
* provided.
*
* @param config
*/
public void watch (final Config config)
{
//Add a custom config
configs.add(config);
}
/**
* Register path in the config with the file watch service,
* walking the tree if it happens to be a directory.
*
* @param baseDir
* @throws IOException
*/
protected void prepareConfig (final Config baseDir) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("Watching directory {}",baseDir);
}
Files.walkFileTree(baseDir.getPath(), new DepthLimitedFileVisitor(this, baseDir));
}
/**
* Add a listener for changes the watcher notices.
*
@ -978,12 +1022,24 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
@Override
protected void doStart() throws Exception
{
//create a new watchservice
createWatchService();
//ensure setting of quiet time is appropriate now we have a watcher
setUpdateQuietTime(getUpdateQuietTimeMillis(), TimeUnit.MILLISECONDS);
// Register all watched paths, walking dir hierarchies as needed, possibly generating
// fake add events if notifyExistingOnStart is true
for (Config c:configs)
prepareConfig(c);
// Start Thread for watcher take/pollKeys loop
StringBuilder threadId = new StringBuilder();
threadId.append("PathWatcher-Thread");
appendConfigId(threadId);
thread = new Thread(this,threadId.toString());
thread.setDaemon(true);
thread.start();
super.doStart();
}
@ -994,10 +1050,56 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
@Override
protected void doStop() throws Exception
{
watcher.close();
if (watchService != null)
watchService.close(); //will invalidate registered watch keys, interrupt thread in take or poll
watchService = null;
thread = null;
keys.clear();
pendingEvents.clear();
super.doStop();
}
/**
* Create a fresh WatchService and determine if it is a
* native implementation or not.
*
* @throws IOException
*/
private void createWatchService () throws IOException
{
//create a watch service
this.watchService = FileSystems.getDefault().newWatchService();
WatchEvent.Modifier modifiers[] = null;
boolean nativeService = true;
// Try to determine native behavior
// See http://stackoverflow.com/questions/9588737/is-java-7-watchservice-slow-for-anyone-else
try
{
ClassLoader cl = Thread.currentThread().getContextClassLoader();
Class<?> pollingWatchServiceClass = Class.forName("sun.nio.fs.PollingWatchService",false,cl);
if (pollingWatchServiceClass.isAssignableFrom(this.watchService.getClass()))
{
nativeService = false;
LOG.info("Using Non-Native Java {}",pollingWatchServiceClass.getName());
Class<?> c = Class.forName("com.sun.nio.file.SensitivityWatchEventModifier");
Field f = c.getField("HIGH");
modifiers = new WatchEvent.Modifier[]
{
(WatchEvent.Modifier)f.get(c)
};
}
}
catch (Throwable t)
{
// Unknown JVM environment, assuming native.
LOG.ignore(t);
}
this.watchModifiers = modifiers;
this.nativeWatchService = nativeService;
}
/**
* Check to see if the watcher is in a state where it should generate
* watch events to the listeners. Used to determine if watcher should generate
@ -1086,11 +1188,11 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
LOG.debug("Registering watch on {}",dir);
if(watchModifiers != null) {
// Java Watcher
WatchKey key = dir.register(watcher,WATCH_EVENT_KINDS,watchModifiers);
WatchKey key = dir.register(watchService,WATCH_EVENT_KINDS,watchModifiers);
keys.put(key,root.asSubConfig(dir));
} else {
// Native Watcher
WatchKey key = dir.register(watcher,WATCH_EVENT_KINDS);
WatchKey key = dir.register(watchService,WATCH_EVENT_KINDS);
keys.put(key,root.asSubConfig(dir));
}
}
@ -1133,7 +1235,7 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
// Start the java.nio watching
if (LOG.isDebugEnabled())
{
LOG.debug("Starting java.nio file watching with {}",watcher);
LOG.debug("Starting java.nio file watching with {}",watchService);
}
while (true)
@ -1147,8 +1249,7 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
{
if (NOISY_LOG.isDebugEnabled())
NOISY_LOG.debug("Waiting for take()");
key = watcher.take();
key = watchService.take();
}
else
{
@ -1157,7 +1258,7 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
if (NOISY_LOG.isDebugEnabled())
NOISY_LOG.debug("Waiting for poll({}, {})",updateQuietTimeDuration,updateQuietTimeUnit);
key = watcher.poll(updateQuietTimeDuration,updateQuietTimeUnit);
key = watchService.poll(updateQuietTimeDuration,updateQuietTimeUnit);
//If no new events its safe to process the pendings
if (key == null)
@ -1230,7 +1331,7 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
{
try
{
addDirectoryWatch(config.asSubConfig(child));
prepareConfig(config.asSubConfig(child));
}
catch (IOException e)
{
@ -1316,9 +1417,9 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
{
long desiredMillis = unit.toMillis(duration);
if (!this.nativeWatchService && (desiredMillis < 5000))
if (watchService != null && !this.nativeWatchService && (desiredMillis < 5000))
{
LOG.warn("Quiet Time is too low for non-native WatchService [{}]: {} < 5000 ms (defaulting to 5000 ms)",watcher.getClass().getName(),desiredMillis);
LOG.warn("Quiet Time is too low for non-native WatchService [{}]: {} < 5000 ms (defaulting to 5000 ms)",watchService.getClass().getName(),desiredMillis);
this.updateQuietTimeDuration = 5000;
this.updateQuietTimeUnit = TimeUnit.MILLISECONDS;
return;

View File

@ -80,7 +80,7 @@ public class PathWatcherDemo implements PathWatcher.Listener
});
//watcher.setNotifyExistingOnStart(false);
watcher.setNotifyExistingOnStart(false);
List<String> excludes = new ArrayList<>();
excludes.add("glob:*.bak"); // ignore backup files
@ -94,14 +94,16 @@ public class PathWatcherDemo implements PathWatcher.Listener
config.addExcludeHidden();
config.addExcludes(excludes);
config.setRecurseDepth(4);
watcher.addDirectoryWatch(config);
watcher.watch(config);
}
else
{
watcher.addFileWatch(path);
watcher.watch(path);
}
}
watcher.start();
Thread.currentThread().join();
}
@Override

View File

@ -36,6 +36,8 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.swing.event.EventListenerList;
import org.eclipse.jetty.toolchain.test.OS;
import org.eclipse.jetty.toolchain.test.TestingDir;
import org.eclipse.jetty.util.PathWatcher.PathWatchEvent;
@ -46,7 +48,7 @@ import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@Ignore("Disabled due to behavioral differences in various FileSystems (hard to write a single testcase that works in all scenarios)")
//@Ignore("Disabled due to behavioral differences in various FileSystems (hard to write a single testcase that works in all scenarios)")
public class PathWatcherTest
{
public static class PathWatchEventCapture implements PathWatcher.Listener
@ -60,6 +62,7 @@ public class PathWatcherTest
*/
public Map<String, List<PathWatchEventType>> events = new HashMap<>();
public int latchCount = 1;
public CountDownLatch finishedLatch;
private PathWatchEventType triggerType;
private Path triggerPath;
@ -69,7 +72,11 @@ public class PathWatcherTest
this.baseDir = baseDir;
}
public void reset()
{
finishedLatch = new CountDownLatch(latchCount);
events.clear();
}
@Override
public void onPathWatchEvent(PathWatchEvent event)
@ -160,13 +167,15 @@ public class PathWatcherTest
{
this.triggerPath = triggerPath;
this.triggerType = triggerType;
this.latchCount = 1;
this.finishedLatch = new CountDownLatch(1);
LOG.debug("Setting finish trigger {} for path {}",triggerType,triggerPath);
}
public void setFinishTrigger (int count)
{
finishedLatch = new CountDownLatch(count);
latchCount = count;
finishedLatch = new CountDownLatch(latchCount);
}
/**
@ -350,6 +359,64 @@ public class PathWatcherTest
assertThat("Config.recurse[1].shouldRecurse[./a]",config.shouldRecurseDirectory(dir.resolve("a")),is(true));
assertThat("Config.recurse[1].shouldRecurse[./]",config.shouldRecurseDirectory(dir),is(true));
}
@Test
public void testRestart() throws Exception
{
Path dir = testdir.getEmptyDir().toPath();
Files.createDirectories(dir.resolve("b/c"));
Files.createFile(dir.resolve("a.txt"));
Files.createFile(dir.resolve("b.txt"));
PathWatcher pathWatcher = new PathWatcher();
pathWatcher.setNotifyExistingOnStart(true);
pathWatcher.setUpdateQuietTime(500,TimeUnit.MILLISECONDS);
// Add listener
PathWatchEventCapture capture = new PathWatchEventCapture(dir);
capture.setFinishTrigger(2);
pathWatcher.addListener(capture);
PathWatcher.Config config = new PathWatcher.Config(dir);
config.setRecurseDepth(PathWatcher.Config.UNLIMITED_DEPTH);
config.addIncludeGlobRelative("*.txt");
pathWatcher.watch(config);
try
{
pathWatcher.start();
// Let quiet time do its thing
awaitQuietTime(pathWatcher);
Map<String, PathWatchEventType[]> expected = new HashMap<>();
expected.put("a.txt",new PathWatchEventType[] {ADDED});
expected.put("b.txt",new PathWatchEventType[] {ADDED});
capture.assertEvents(expected);
//stop it
pathWatcher.stop();
capture.reset();
Thread.currentThread().sleep(1000);
pathWatcher.start();
awaitQuietTime(pathWatcher);
capture.assertEvents(expected);
}
finally
{
pathWatcher.stop();
}
}
/**
* When starting up the PathWatcher, the events should occur
@ -389,7 +456,7 @@ public class PathWatcherTest
baseDirConfig.addExcludeHidden();
baseDirConfig.addIncludeGlobRelative("*.war");
baseDirConfig.addIncludeGlobRelative("*/WEB-INF/web.xml");
pathWatcher.addDirectoryWatch(baseDirConfig);
pathWatcher.watch(baseDirConfig);
try
{
@ -435,7 +502,7 @@ public class PathWatcherTest
baseDirConfig.addExcludeHidden();
baseDirConfig.addIncludeGlobRelative("*.war");
baseDirConfig.addIncludeGlobRelative("*/WEB-INF/web.xml");
pathWatcher.addDirectoryWatch(baseDirConfig);
pathWatcher.watch(baseDirConfig);
try
{
@ -495,7 +562,7 @@ public class PathWatcherTest
baseDirConfig.addExcludeHidden();
baseDirConfig.addIncludeGlobRelative("*.war");
baseDirConfig.addIncludeGlobRelative("*/WEB-INF/web.xml");
pathWatcher.addDirectoryWatch(baseDirConfig);
pathWatcher.watch(baseDirConfig);
try
{
@ -559,7 +626,7 @@ public class PathWatcherTest
baseDirConfig.addExcludeHidden();
baseDirConfig.addIncludeGlobRelative("*.war");
baseDirConfig.addIncludeGlobRelative("*/WEB-INF/web.xml");
pathWatcher.addDirectoryWatch(baseDirConfig);
pathWatcher.watch(baseDirConfig);
try
{