From aa5d76e1bb44787045dacefbdeff754078d5a141 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 30 Jul 2019 11:02:08 -0400 Subject: [PATCH] ARTEMIS-2414 AIOSequentialFile was ignoring sync and leaking files --- .../core/io/aio/AIOSequentialFile.java | 48 ++++++++--------- .../tests/util/NoProcessFilesBehind.java | 52 ++++++++++++++----- 2 files changed, 61 insertions(+), 39 deletions(-) diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java index 793dc701b6..86aee94d11 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java @@ -107,36 +107,32 @@ public class AIOSequentialFile extends AbstractSequentialFile { } super.close(); - - if (waitSync) { - final String fileName = this.getFileName(); - try { - int waitCount = 0; - while (!pendingCallbacks.await(10, TimeUnit.SECONDS)) { - waitCount++; - if (waitCount == 1) { - final ThreadInfo[] threads = ManagementFactory.getThreadMXBean().dumpAllThreads(true, true); - for (ThreadInfo threadInfo : threads) { - ActiveMQJournalLogger.LOGGER.warn(threadInfo.toString()); + try { + if (waitSync) { + final String fileName = this.getFileName(); + try { + int waitCount = 0; + while (!pendingCallbacks.await(10, TimeUnit.SECONDS)) { + waitCount++; + if (waitCount == 1) { + final ThreadInfo[] threads = ManagementFactory.getThreadMXBean().dumpAllThreads(true, true); + for (ThreadInfo threadInfo : threads) { + ActiveMQJournalLogger.LOGGER.warn(threadInfo.toString()); + } + factory.onIOError(new IOException("Timeout on close"), "Timeout on close", this); } - factory.onIOError(new IOException("Timeout on close"), "Timeout on close", this); + ActiveMQJournalLogger.LOGGER.warn("waiting pending callbacks on " + fileName + " from " + (waitCount * 10) + " seconds!"); } - ActiveMQJournalLogger.LOGGER.warn("waiting pending callbacks on " + fileName + " from " + (waitCount * 10) + " seconds!"); + } catch (InterruptedException e) { + ActiveMQJournalLogger.LOGGER.warn("interrupted while waiting pending callbacks on " + fileName, e); + throw e; } - } catch (InterruptedException e) { - ActiveMQJournalLogger.LOGGER.warn("interrupted while waiting pending callbacks on " + fileName, e); - throw e; - } finally { - - opened = false; - - timedBuffer = null; - - aioFile.close(); - - aioFile = null; - } + } finally { + opened = false; + timedBuffer = null; + aioFile.close(); + aioFile = null; } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/NoProcessFilesBehind.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/NoProcessFilesBehind.java index e04b6ceaf5..949d678e11 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/NoProcessFilesBehind.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/NoProcessFilesBehind.java @@ -41,16 +41,26 @@ public class NoProcessFilesBehind extends TestWatcher { private static Logger log = Logger.getLogger(NoProcessFilesBehind.class); + public NoProcessFilesBehind(long maxFiles) { + this(-1, maxFiles); + } + /** * -1 on maxVariance means no check */ - public NoProcessFilesBehind(long maxFiles) { + public NoProcessFilesBehind(long variance, long maxFiles) { this.maxFiles = maxFiles; + if (variance < 0) { + maxvariance = null; + } else { + this.maxvariance = variance; + } } long fdBefore; long maxFiles; + Long maxvariance; static OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean(); @@ -65,7 +75,9 @@ public class NoProcessFilesBehind extends TestWatcher { @Override protected void starting(Description description) { LibaioContext.isLoaded(); - fdBefore = getOpenFD(); + if (maxvariance != null) { + fdBefore = getOpenFD(); + } } public static List getOpenFiles(boolean filtered) { @@ -115,20 +127,34 @@ public class NoProcessFilesBehind extends TestWatcher { protected void finished(Description description) { Wait.waitFor(() -> getOpenFD() < maxFiles, 5000, 0); - if (getOpenFD() >= maxFiles) { - List openFiles = getOpenFiles(true); - StringWriter stringWriter = new StringWriter(); - PrintWriter printWriter = new PrintWriter(stringWriter); - boolean first = true; - for (String str : openFiles) { - if (!first) printWriter.print(", "); - first = false; - printWriter.print(str); + + if (maxvariance != null) { + long currentVariance = getOpenFD() - fdBefore; + + if (currentVariance > 0 && currentVariance > maxvariance) { + Assert.fail("too many files were opened files on this test::" + getOpenList()); } - Assert.fail("Too many files open (" + maxFiles + "). A possible list: " + stringWriter.toString()); + + } + + if (!Wait.waitFor(() -> getOpenFD() < maxFiles, 5000, 0)) { + String fileList = getOpenList(); + Assert.fail("Too many files open (" + maxFiles + "). A possible list: " + fileList); } - Wait.assertTrue("Too many open files", () -> getOpenFD() < maxFiles, 5000, 0); } + private String getOpenList() { + List openFiles = getOpenFiles(true); + StringWriter stringWriter = new StringWriter(); + PrintWriter printWriter = new PrintWriter(stringWriter); + boolean first = true; + for (String str : openFiles) { + if (!first) printWriter.print("\n"); + first = false; + printWriter.print(str); + } + return stringWriter.toString(); + } + }