LUCENE-9061: Use an explicit executor service in async channel tests, otherwise they leak internal JVM threads.

This commit is contained in:
Dawid Weiss 2019-11-23 13:42:45 +01:00 committed by Dawid Weiss
parent 312431b182
commit fad75cf98d
2 changed files with 62 additions and 29 deletions

View File

@ -26,6 +26,11 @@ import java.nio.channels.SeekableByteChannel;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.NamedThreadFactory;
/** Basic tests for LeakFS */
public class TestLeakFS extends MockFileSystemTestCase {
@ -75,16 +80,25 @@ public class TestLeakFS extends MockFileSystemTestCase {
}
/** Test leaks via AsynchronousFileChannel.open */
public void testLeakAsyncFileChannel() throws IOException {
public void testLeakAsyncFileChannel() throws IOException, InterruptedException {
Path dir = wrap(createTempDir());
OutputStream file = Files.newOutputStream(dir.resolve("stillopen"));
file.write(5);
file.close();
AsynchronousFileChannel leak = AsynchronousFileChannel.open(dir.resolve("stillopen"));
Exception e = expectThrows(Exception.class, () -> dir.getFileSystem().close());
assertTrue(e.getMessage().contains("file handle leaks"));
leak.close();
ExecutorService executorService = Executors.newFixedThreadPool(1,
new NamedThreadFactory("async-io"));
try {
AsynchronousFileChannel leak = AsynchronousFileChannel.open(dir.resolve("stillopen"),
Collections.emptySet(), executorService);
Exception e = expectThrows(Exception.class, () -> dir.getFileSystem().close());
assertTrue(e.getMessage().contains("file handle leaks"));
leak.close();
} finally {
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
}
}
/** Test leaks via Files.newByteChannel */

View File

@ -27,34 +27,39 @@ import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.NamedThreadFactory;
/** Basic tests for VerboseFS */
public class TestVerboseFS extends MockFileSystemTestCase {
@Override
protected Path wrap(Path path) {
return wrap(path, InfoStream.NO_OUTPUT);
}
Path wrap(Path path, InfoStream stream) {
FileSystem fs = new VerboseFS(path.getFileSystem(), stream).getFileSystem(URI.create("file:///"));
return new FilterPath(path, fs);
}
/** InfoStream that looks for a substring and indicates if it saw it */
static class InfoStreamListener extends InfoStream {
/** True if we saw the message */
final AtomicBoolean seenMessage = new AtomicBoolean(false);
/** Expected message */
/** Expected message */
final String messageStartsWith;
InfoStreamListener(String messageStartsWith) {
this.messageStartsWith = messageStartsWith;
}
@Override
public void close() throws IOException {}
@ -69,12 +74,12 @@ public class TestVerboseFS extends MockFileSystemTestCase {
public boolean isEnabled(String component) {
return true;
}
boolean sawMessage() {
return seenMessage.get();
}
}
/** Test createDirectory */
public void testCreateDirectory() throws IOException {
InfoStreamListener stream = new InfoStreamListener("createDirectory");
@ -84,7 +89,7 @@ public class TestVerboseFS extends MockFileSystemTestCase {
expectThrows(IOException.class, () -> Files.createDirectory(dir.resolve("subdir")));
}
/** Test delete */
public void testDelete() throws IOException {
InfoStreamListener stream = new InfoStreamListener("delete");
@ -95,7 +100,7 @@ public class TestVerboseFS extends MockFileSystemTestCase {
expectThrows(IOException.class, () -> Files.delete(dir.resolve("foobar")));
}
/** Test deleteIfExists */
public void testDeleteIfExists() throws IOException {
InfoStreamListener stream = new InfoStreamListener("deleteIfExists");
@ -107,7 +112,7 @@ public class TestVerboseFS extends MockFileSystemTestCase {
// no exception
Files.deleteIfExists(dir.resolve("foobar"));
}
/** Test copy */
public void testCopy() throws IOException {
InfoStreamListener stream = new InfoStreamListener("copy");
@ -118,7 +123,7 @@ public class TestVerboseFS extends MockFileSystemTestCase {
expectThrows(IOException.class, () -> Files.copy(dir.resolve("nonexistent"), dir.resolve("something")));
}
/** Test move */
public void testMove() throws IOException {
InfoStreamListener stream = new InfoStreamListener("move");
@ -129,7 +134,7 @@ public class TestVerboseFS extends MockFileSystemTestCase {
expectThrows(IOException.class, () -> Files.move(dir.resolve("nonexistent"), dir.resolve("something")));
}
/** Test newOutputStream */
public void testNewOutputStream() throws IOException {
InfoStreamListener stream = new InfoStreamListener("newOutputStream");
@ -140,7 +145,7 @@ public class TestVerboseFS extends MockFileSystemTestCase {
expectThrows(IOException.class, () -> Files.newOutputStream(dir.resolve("output"), StandardOpenOption.CREATE_NEW));
}
/** Test FileChannel.open */
public void testFileChannel() throws IOException {
InfoStreamListener stream = new InfoStreamListener("newFileChannel");
@ -152,19 +157,34 @@ public class TestVerboseFS extends MockFileSystemTestCase {
expectThrows(IOException.class, () -> FileChannel.open(dir.resolve("foobar"),
StandardOpenOption.CREATE_NEW, StandardOpenOption.READ, StandardOpenOption.WRITE));
}
/** Test AsynchronousFileChannel.open */
public void testAsyncFileChannel() throws IOException {
public void testAsyncFileChannel() throws IOException, InterruptedException {
InfoStreamListener stream = new InfoStreamListener("newAsynchronousFileChannel");
Path dir = wrap(createTempDir(), stream);
AsynchronousFileChannel channel = AsynchronousFileChannel.open(dir.resolve("foobar"), StandardOpenOption.CREATE_NEW, StandardOpenOption.READ, StandardOpenOption.WRITE);
assertTrue(stream.sawMessage());
channel.close();
expectThrows(IOException.class, () -> AsynchronousFileChannel.open(dir.resolve("foobar"),
StandardOpenOption.CREATE_NEW, StandardOpenOption.READ, StandardOpenOption.WRITE));
ExecutorService executorService = Executors.newFixedThreadPool(1,
new NamedThreadFactory("async-io"));
try {
Set<StandardOpenOption> opts = Set
.of(StandardOpenOption.CREATE_NEW, StandardOpenOption.READ,
StandardOpenOption.WRITE);
AsynchronousFileChannel channel = AsynchronousFileChannel
.open(dir.resolve("foobar"), opts, executorService);
assertTrue(stream.sawMessage());
channel.close();
expectThrows(IOException.class, () -> AsynchronousFileChannel.open(dir.resolve("foobar"),
opts, executorService));
expectThrows(NoSuchFileException.class,
() -> AsynchronousFileChannel.open(dir.resolve("doesNotExist.rip")));
} finally {
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
}
}
/** Test newByteChannel */
public void testByteChannel() throws IOException {
InfoStreamListener stream = new InfoStreamListener("newByteChannel");
@ -176,11 +196,10 @@ public class TestVerboseFS extends MockFileSystemTestCase {
expectThrows(IOException.class, () -> Files.newByteChannel(dir.resolve("foobar"),
StandardOpenOption.CREATE_NEW, StandardOpenOption.READ, StandardOpenOption.WRITE));
}
/** Test that verbose does not corrupt file not found exceptions */
public void testVerboseFSNoSuchFileException() {
Path dir = wrap(createTempDir());
expectThrows(NoSuchFileException.class, () -> AsynchronousFileChannel.open(dir.resolve("doesNotExist.rip")));
expectThrows(NoSuchFileException.class, () -> FileChannel.open(dir.resolve("doesNotExist.rip")));
expectThrows(NoSuchFileException.class, () -> Files.newByteChannel(dir.resolve("stillopen")));
}