NIFI-12019 bugfix for SynchronousFileWatcher time check interval

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #7664
This commit is contained in:
Mike Moser 2023-09-01 20:03:39 +00:00 committed by Matt Burgess
parent 25a3cd335c
commit 1449325027
2 changed files with 14 additions and 21 deletions

View File

@ -59,7 +59,7 @@ public class SynchronousFileWatcher {
/**
* Checks if the file has been updated according to the configured {@link UpdateMonitor} and resets the state
*
* @return true if updated; false otherwise
* @return true if updated; false if not updated or if not yet time to check
* @throws IOException if failure occurs checking for changes
*/
public boolean checkAndReset() throws IOException {
@ -77,21 +77,16 @@ public class SynchronousFileWatcher {
private boolean checkForUpdate() throws IOException {
if (resourceLock.tryLock()) {
try {
final StateWrapper wrapper = lastState.get();
final Object oldState = lastState.get().getState();
final Object newState = monitor.getCurrentState(path);
if (newState == null && wrapper.getState() == null) {
if (newState == null && oldState == null) {
return false;
}
if (newState == null || wrapper.getState() == null) {
lastState.set(new StateWrapper(newState));
if (newState == null || oldState == null) {
return true;
}
final boolean unmodified = newState.equals(wrapper.getState());
if (!unmodified) {
lastState.set(new StateWrapper(newState));
}
return !unmodified;
return !newState.equals(oldState);
} finally {
resourceLock.unlock();
}

View File

@ -21,7 +21,6 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@ -33,28 +32,27 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestSynchronousFileWatcher {
@Test
public void testIt() throws UnsupportedEncodingException, IOException, InterruptedException {
public void testIt() throws IOException, InterruptedException {
final Path path = Paths.get("target/1.txt");
Files.copy(new ByteArrayInputStream("Hello, World!".getBytes("UTF-8")), path, StandardCopyOption.REPLACE_EXISTING);
final UpdateMonitor monitor = new DigestUpdateMonitor();
final SynchronousFileWatcher watcher = new SynchronousFileWatcher(path, monitor, 10L);
final SynchronousFileWatcher watcher = new SynchronousFileWatcher(path, monitor, 40L);
assertFalse(watcher.checkAndReset());
Thread.sleep(30L);
Thread.sleep(41L);
assertFalse(watcher.checkAndReset());
final FileOutputStream fos = new FileOutputStream(path.toFile());
try {
try (FileOutputStream fos = new FileOutputStream(path.toFile())) {
fos.write("Good-bye, World!".getBytes("UTF-8"));
fos.getFD().sync();
} finally {
fos.close();
}
assertTrue(watcher.checkAndReset());
// immediately after file changes, but before the next check time is reached, answer should be false
assertFalse(watcher.checkAndReset());
Thread.sleep(30L);
// after check time has passed, answer should be true
Thread.sleep(41L);
assertTrue(watcher.checkAndReset());
assertFalse(watcher.checkAndReset());
}
}