Issue #1200 Improve PathWatcher

This commit is contained in:
Greg Wilkins 2017-08-24 16:24:57 +10:00
parent 5a9e2ea25a
commit 3eadc5f1db
2 changed files with 274 additions and 109 deletions

View File

@ -39,8 +39,8 @@ import java.nio.file.WatchEvent.Kind;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EventListener;
import java.util.HashMap;
@ -51,7 +51,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
@ -71,7 +70,19 @@ import org.eclipse.jetty.util.log.Logger;
*/
public class PathWatcher extends AbstractLifeCycle implements Runnable
{
public static class Config
public static class PathMatcherSet extends HashSet<PathMatcher> implements Predicate<Path>
{
@Override
public boolean test(Path path)
{
for (PathMatcher pm: this)
if (pm.matches(path))
return true;
return false;
}
}
public static class Config implements Predicate<Path>
{
public static final int UNLIMITED_DEPTH = -9999;
@ -290,38 +301,28 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
return this.dir;
}
@Deprecated
public boolean isExcluded(Path dir) throws IOException
public boolean test(Path path)
{
if (excludeHidden)
try
{
if (Files.isHidden(dir))
if (excludeHidden && Files.isHidden(path))
{
if (NOISY_LOG.isDebugEnabled())
{
NOISY_LOG.debug("isExcluded [Hidden] on {}",dir);
}
return true;
VERBOSE_LOG.debug("test({}) -> [Hidden]", path);
return false;
}
}
boolean matched = ((PathMatcherSet)includeExclude.getExcluded()).test(dir);
if (NOISY_LOG.isDebugEnabled())
{
NOISY_LOG.debug("isExcluded [{}] on {}",matched,dir);
}
return matched;
}
boolean matched = includeExclude.test(path);
@Deprecated
public boolean isIncluded(Path dir)
{
boolean matched = ((PathMatcherSet)includeExclude.getIncluded()).test(dir);
if (NOISY_LOG.isDebugEnabled())
{
NOISY_LOG.debug("isIncluded [{}] on {}",matched,dir);
if (VERBOSE_LOG.isDebugEnabled())
{
VERBOSE_LOG.debug("test({}) -> {}", path, matched);
}
return matched;
}
catch(IOException e)
{
throw new RuntimeException(e);
}
return matched;
}
public boolean matches(Path path)
@ -379,9 +380,9 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
Path root = path.getRoot();
if (root != null)
{
if (NOISY_LOG.isDebugEnabled())
if (VERBOSE_LOG.isDebugEnabled())
{
NOISY_LOG.debug("Path: {} -> Root: {}",path,root);
VERBOSE_LOG.debug("Path: {} -> Root: {}", path, root);
}
for (char c : root.toString().toCharArray())
{
@ -485,21 +486,18 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
// - generate pending add event (iff notifiable and matches patterns)
// - else stop visiting this dir
if (!base.isExcluded(dir))
if (base.test(dir))
{
if (base.isIncluded(dir))
if (watcher.isNotifiable())
{
if (watcher.isNotifiable())
// Directory is specifically included in PathMatcher, then
// it should be notified as such to interested listeners
PathWatchEvent event = new PathWatchEvent(dir,PathWatchEventType.ADDED);
if (LOG.isDebugEnabled())
{
// Directory is specifically included in PathMatcher, then
// it should be notified as such to interested listeners
PathWatchEvent event = new PathWatchEvent(dir,PathWatchEventType.ADDED);
if (LOG.isDebugEnabled())
{
LOG.debug("Pending {}",event);
}
watcher.addToPendingList(dir, event);
LOG.debug("Pending {}",event);
}
watcher.addToPendingList(dir, event);
}
//Register the dir with the watcher if it is:
@ -679,7 +677,7 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
@Override
public String toString()
{
return String.format("PathWatchEvent[%s|%s]",type,path);
return String.format("PathWatchEvent[%8s|%s]",type,path);
}
}
@ -825,7 +823,7 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
/**
* super noisy debug logging
*/
private static final Logger NOISY_LOG = Log.getLogger(PathWatcher.class.getName() + ".Noisy");
private static final Logger VERBOSE_LOG = Log.getLogger(PathWatcher.class.getName() + ".PathWatcherVerbose");
@SuppressWarnings("unchecked")
protected static <T> WatchEvent<T> cast(WatchEvent<?> event)
@ -860,7 +858,12 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
public PathWatcher()
{
}
public Collection<Config> getConfigs()
{
return configs;
}
/**
* Request watch on a the given path (either file or dir)
* using all Config defaults. In the case of a dir,
@ -1225,16 +1228,16 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
//If no pending events, wait forever for new events
if (pendingEvents.isEmpty())
{
if (NOISY_LOG.isDebugEnabled())
NOISY_LOG.debug("Waiting for take()");
if (VERBOSE_LOG.isDebugEnabled())
VERBOSE_LOG.debug("Waiting for take()");
key = watchService.take();
}
else
{
//There are existing events that might be ready to go,
//only wait as long as the quiet time for any new events
if (NOISY_LOG.isDebugEnabled())
NOISY_LOG.debug("Waiting for poll({}, {})",updateQuietTimeDuration,updateQuietTimeUnit);
if (VERBOSE_LOG.isDebugEnabled())
VERBOSE_LOG.debug("Waiting for poll({}, {})", updateQuietTimeDuration, updateQuietTimeUnit);
key = watchService.poll(updateQuietTimeDuration,updateQuietTimeUnit);
@ -1425,15 +1428,4 @@ public class PathWatcher extends AbstractLifeCycle implements Runnable
}
public static class PathMatcherSet extends HashSet<PathMatcher> implements Predicate<Path>
{
@Override
public boolean test(Path path)
{
for (PathMatcher pm: this)
if (pm.matches(path))
return true;
return false;
}
}
}

View File

@ -26,8 +26,11 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -46,7 +49,6 @@ import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@Ignore("Disabled due to behavioral differences in various FileSystems (hard to write a single testcase that works in all scenarios)")
public class PathWatcherTest
{
public static class PathWatchEventCapture implements PathWatcher.Listener
@ -76,27 +78,17 @@ public class PathWatcherTest
events.clear();
}
public void reset(int count)
{
setFinishTrigger(count);
events.clear();
}
@Override
public void onPathWatchEvent(PathWatchEvent event)
{
synchronized (events)
{
//if triggered by path
if (triggerPath != null)
{
if (triggerPath.equals(event.getPath()) && (event.getType() == triggerType))
{
LOG.debug("Encountered finish trigger: {} on {}",event.getType(),event.getPath());
finishedLatch.countDown();
}
}
else if (finishedLatch != null)
{
finishedLatch.countDown();
}
Path relativePath = this.baseDir.relativize(event.getPath());
String key = relativePath.toString().replace(File.separatorChar,'/');
@ -108,6 +100,21 @@ public class PathWatcherTest
types.add(event.getType());
this.events.put(key,types);
LOG.debug("Captured Event: {} | {}",event.getType(),key);
//if triggered by path
if (triggerPath != null)
{
if (triggerPath.equals(event.getPath()) && (event.getType() == triggerType))
{
LOG.debug("Encountered finish trigger: {} on {}",event.getType(),event.getPath());
finishedLatch.countDown();
}
}
else if (finishedLatch != null)
{
finishedLatch.countDown();
}
}
}
@ -122,13 +129,21 @@ public class PathWatcherTest
*/
public void assertEvents(Map<String, PathWatchEventType[]> expectedEvents)
{
assertThat("Event match (file|diretory) count",this.events.size(),is(expectedEvents.size()));
for (Map.Entry<String, PathWatchEventType[]> entry : expectedEvents.entrySet())
try
{
String relativePath = entry.getKey();
PathWatchEventType[] expectedTypes = entry.getValue();
assertEvents(relativePath,expectedTypes);
assertThat("Event match (file|diretory) count", this.events.size(), is(expectedEvents.size()));
for (Map.Entry<String, PathWatchEventType[]> entry : expectedEvents.entrySet())
{
String relativePath = entry.getKey();
PathWatchEventType[] expectedTypes = entry.getValue();
assertEvents(relativePath, expectedTypes);
}
}
catch(Throwable th)
{
System.err.println(this.events);
throw th;
}
}
@ -228,39 +243,46 @@ public class PathWatcherTest
* @throws InterruptedException
* if sleep between writes was interrupted
*/
private void updateFileOverTime(Path path, int fileSize, int timeDuration, TimeUnit timeUnit) throws IOException, InterruptedException
private void updateFileOverTime(Path path, int fileSize, int timeDuration, TimeUnit timeUnit)
{
// how long to sleep between writes
int sleepMs = 100;
// how many millis to spend writing entire file size
long totalMs = timeUnit.toMillis(timeDuration);
// how many write chunks to write
int writeCount = (int)((int)totalMs / (int)sleepMs);
// average chunk buffer
int chunkBufLen = fileSize / writeCount;
byte chunkBuf[] = new byte[chunkBufLen];
Arrays.fill(chunkBuf,(byte)'x');
try (FileOutputStream out = new FileOutputStream(path.toFile()))
try
{
int left = fileSize;
// how long to sleep between writes
int sleepMs = 100;
while (left > 0)
// how many millis to spend writing entire file size
long totalMs = timeUnit.toMillis(timeDuration);
// how many write chunks to write
int writeCount = (int)((int)totalMs / (int)sleepMs);
// average chunk buffer
int chunkBufLen = fileSize / writeCount;
byte chunkBuf[] = new byte[chunkBufLen];
Arrays.fill(chunkBuf, (byte)'x');
try (FileOutputStream out = new FileOutputStream(path.toFile()))
{
int len = Math.min(left,chunkBufLen);
out.write(chunkBuf,0,len);
left -= chunkBufLen;
out.flush();
out.getChannel().force(true);
// Force file to actually write to disk.
// Skipping any sort of filesystem caching of the write
out.getFD().sync();
TimeUnit.MILLISECONDS.sleep(sleepMs);
int left = fileSize;
while (left > 0)
{
int len = Math.min(left, chunkBufLen);
out.write(chunkBuf, 0, len);
left -= chunkBufLen;
out.flush();
out.getChannel().force(true);
// Force file to actually write to disk.
// Skipping any sort of filesystem caching of the write
out.getFD().sync();
TimeUnit.MILLISECONDS.sleep(sleepMs);
}
}
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
/**
@ -291,6 +313,157 @@ public class PathWatcherTest
@Rule
public TestingDir testdir = new TestingDir();
@Test
public void testSequence() throws Exception
{
Path dir = testdir.getEmptyPathDir();
// Files we are interested in
Files.createFile(dir.resolve("file0"));
Files.createDirectories(dir.resolve("subdir0/subsubdir0"));
Files.createFile(dir.resolve("subdir0/fileA"));
Files.createFile(dir.resolve("subdir0/subsubdir0/unseen"));
PathWatcher pathWatcher = new PathWatcher();
pathWatcher.setUpdateQuietTime(300,TimeUnit.MILLISECONDS);
// Add listener
PathWatchEventCapture capture = new PathWatchEventCapture(dir);
pathWatcher.addListener(capture);
// Add test dir configuration
PathWatcher.Config config = new PathWatcher.Config(dir);
config.setRecurseDepth(1);
pathWatcher.watch(config);
try
{
Map<String, PathWatchEventType[]> expected = new HashMap<>();
// Check initial scan events
capture.setFinishTrigger(5);
pathWatcher.start();
expected.put("",new PathWatchEventType[] { ADDED });
expected.put("file0",new PathWatchEventType[] { ADDED });
expected.put("subdir0",new PathWatchEventType[] { ADDED });
expected.put("subdir0/fileA",new PathWatchEventType[] { ADDED });
expected.put("subdir0/subsubdir0",new PathWatchEventType[] { ADDED });
capture.finishedLatch.await(5,TimeUnit.SECONDS);
capture.assertEvents(expected);
Thread.sleep(500);
capture.assertEvents(expected);
// Check adding files
capture.reset(2);
expected.clear();
Files.createFile(dir.resolve("subdir0/subsubdir0/toodeep"));
//TODO expected.put("subdir0/subsubdir0",new PathWatchEventType[] { MODIFIED });
Files.createFile(dir.resolve("file1"));
expected.put("file1",new PathWatchEventType[] { ADDED });
Files.createFile(dir.resolve("subdir0/fileB"));
expected.put("subdir0/fileB",new PathWatchEventType[] { ADDED });
capture.finishedLatch.await(5,TimeUnit.SECONDS);
capture.assertEvents(expected);
Thread.sleep(500);
capture.assertEvents(expected);
// check modify directory
capture.reset(1);
expected.clear();
Files.setLastModifiedTime(dir.resolve("subdir0"), FileTime.fromMillis(System.currentTimeMillis()));
expected.put("subdir0",new PathWatchEventType[] { MODIFIED });
capture.finishedLatch.await(5,TimeUnit.SECONDS);
capture.assertEvents(expected);
Thread.sleep(500);
capture.assertEvents(expected);
// Check modify files
capture.reset(2);
expected.clear();
updateFile(dir.resolve("subdir0/subsubdir0/toodeep"),"New Contents");
updateFile(dir.resolve("file1"),"New Contents");
expected.put("file1",new PathWatchEventType[] { MODIFIED });
updateFile(dir.resolve("subdir0/fileB"),"New Contents");
capture.finishedLatch.await(5,TimeUnit.SECONDS);
capture.setFinishTrigger(1);
updateFile(dir.resolve("subdir0/fileB"),"Newer Contents");
expected.put("subdir0/fileB",new PathWatchEventType[] { MODIFIED, MODIFIED });
capture.finishedLatch.await(5,TimeUnit.SECONDS);
capture.assertEvents(expected);
Thread.sleep(500);
capture.assertEvents(expected);
// Check slow modification
capture.reset(1);
expected.clear();
long start = System.nanoTime();
new Thread(()->{updateFileOverTime(dir.resolve("file1"),20,2,TimeUnit.SECONDS);}).start();
expected.put("file1",new PathWatchEventType[] { MODIFIED });
capture.finishedLatch.await(5,TimeUnit.SECONDS);
long end = System.nanoTime();
capture.assertEvents(expected);
assertThat(end-start,greaterThan(TimeUnit.SECONDS.toNanos(2)));
Thread.sleep(500);
capture.assertEvents(expected);
// Check slow add
capture.reset(2);
expected.clear();
start = System.nanoTime();
new Thread(()->{updateFileOverTime(dir.resolve("file2"),20,2,TimeUnit.SECONDS);}).start();
// TODO expected.put("file2",new PathWatchEventType[] { ADDED });
expected.put("file2",new PathWatchEventType[] { ADDED, MODIFIED });
capture.finishedLatch.await(5,TimeUnit.SECONDS);
end = System.nanoTime();
capture.assertEvents(expected);
assertThat(end-start,greaterThan(TimeUnit.SECONDS.toNanos(2)));
Thread.sleep(500);
capture.assertEvents(expected);
// Check move directory
capture.reset(5);
expected.clear();
Files.move(dir.resolve("subdir0"), dir.resolve("subdir1"), StandardCopyOption.ATOMIC_MOVE);
expected.put("subdir0",new PathWatchEventType[] { DELETED });
// TODO expected.put("subdir0/fileA",new PathWatchEventType[] { DELETED });
// TODO expected.put("subdir0/subsubdir0",new PathWatchEventType[] { DELETED });
expected.put("subdir1",new PathWatchEventType[] { ADDED });
expected.put("subdir1/fileA",new PathWatchEventType[] { ADDED });
expected.put("subdir1/fileB",new PathWatchEventType[] { ADDED });
expected.put("subdir1/subsubdir0",new PathWatchEventType[] { ADDED });
capture.finishedLatch.await(5,TimeUnit.SECONDS);
capture.assertEvents(expected);
Thread.sleep(500);
capture.assertEvents(expected);
// Check delete file
capture.reset(2);
expected.clear();
Files.delete(dir.resolve("file1"));
expected.put("file1",new PathWatchEventType[] { DELETED });
Files.delete(dir.resolve("file2"));
expected.put("file2",new PathWatchEventType[] { DELETED });
capture.finishedLatch.await(5,TimeUnit.SECONDS);
capture.assertEvents(expected);
Thread.sleep(500);
capture.assertEvents(expected);
}
finally
{
pathWatcher.stop();
}
}
@Test
public void testConfig_ShouldRecurse_0() throws IOException
{