ARTEMIS-2414 AIOSequentialFile was ignoring sync and leaking files

This commit is contained in:
Clebert Suconic 2019-07-30 11:02:08 -04:00
parent c140af1f8d
commit aa5d76e1bb
2 changed files with 61 additions and 39 deletions

View File

@ -107,36 +107,32 @@ public class AIOSequentialFile extends AbstractSequentialFile {
} }
super.close(); super.close();
try {
if (waitSync) { if (waitSync) {
final String fileName = this.getFileName(); final String fileName = this.getFileName();
try { try {
int waitCount = 0; int waitCount = 0;
while (!pendingCallbacks.await(10, TimeUnit.SECONDS)) { while (!pendingCallbacks.await(10, TimeUnit.SECONDS)) {
waitCount++; waitCount++;
if (waitCount == 1) { if (waitCount == 1) {
final ThreadInfo[] threads = ManagementFactory.getThreadMXBean().dumpAllThreads(true, true); final ThreadInfo[] threads = ManagementFactory.getThreadMXBean().dumpAllThreads(true, true);
for (ThreadInfo threadInfo : threads) { for (ThreadInfo threadInfo : threads) {
ActiveMQJournalLogger.LOGGER.warn(threadInfo.toString()); ActiveMQJournalLogger.LOGGER.warn(threadInfo.toString());
}
factory.onIOError(new IOException("Timeout on close"), "Timeout on close", this);
} }
factory.onIOError(new IOException("Timeout on close"), "Timeout on close", this); ActiveMQJournalLogger.LOGGER.warn("waiting pending callbacks on " + fileName + " from " + (waitCount * 10) + " seconds!");
} }
ActiveMQJournalLogger.LOGGER.warn("waiting pending callbacks on " + fileName + " from " + (waitCount * 10) + " seconds!"); } catch (InterruptedException e) {
ActiveMQJournalLogger.LOGGER.warn("interrupted while waiting pending callbacks on " + fileName, e);
throw e;
} }
} catch (InterruptedException e) {
ActiveMQJournalLogger.LOGGER.warn("interrupted while waiting pending callbacks on " + fileName, e);
throw e;
} finally {
opened = false;
timedBuffer = null;
aioFile.close();
aioFile = null;
} }
} finally {
opened = false;
timedBuffer = null;
aioFile.close();
aioFile = null;
} }
} }

View File

@ -41,16 +41,26 @@ public class NoProcessFilesBehind extends TestWatcher {
private static Logger log = Logger.getLogger(NoProcessFilesBehind.class); private static Logger log = Logger.getLogger(NoProcessFilesBehind.class);
public NoProcessFilesBehind(long maxFiles) {
this(-1, maxFiles);
}
/** /**
* -1 on maxVariance means no check * -1 on maxVariance means no check
*/ */
public NoProcessFilesBehind(long maxFiles) { public NoProcessFilesBehind(long variance, long maxFiles) {
this.maxFiles = maxFiles; this.maxFiles = maxFiles;
if (variance < 0) {
maxvariance = null;
} else {
this.maxvariance = variance;
}
} }
long fdBefore; long fdBefore;
long maxFiles; long maxFiles;
Long maxvariance;
static OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean(); static OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
@ -65,7 +75,9 @@ public class NoProcessFilesBehind extends TestWatcher {
@Override @Override
protected void starting(Description description) { protected void starting(Description description) {
LibaioContext.isLoaded(); LibaioContext.isLoaded();
fdBefore = getOpenFD(); if (maxvariance != null) {
fdBefore = getOpenFD();
}
} }
public static List<String> getOpenFiles(boolean filtered) { public static List<String> getOpenFiles(boolean filtered) {
@ -115,20 +127,34 @@ public class NoProcessFilesBehind extends TestWatcher {
protected void finished(Description description) { protected void finished(Description description) {
Wait.waitFor(() -> getOpenFD() < maxFiles, 5000, 0); Wait.waitFor(() -> getOpenFD() < maxFiles, 5000, 0);
if (getOpenFD() >= maxFiles) {
List<String> openFiles = getOpenFiles(true); if (maxvariance != null) {
StringWriter stringWriter = new StringWriter(); long currentVariance = getOpenFD() - fdBefore;
PrintWriter printWriter = new PrintWriter(stringWriter);
boolean first = true; if (currentVariance > 0 && currentVariance > maxvariance) {
for (String str : openFiles) { Assert.fail("too many files were opened files on this test::" + getOpenList());
if (!first) printWriter.print(", ");
first = false;
printWriter.print(str);
} }
Assert.fail("Too many files open (" + maxFiles + "). A possible list: " + stringWriter.toString());
}
if (!Wait.waitFor(() -> getOpenFD() < maxFiles, 5000, 0)) {
String fileList = getOpenList();
Assert.fail("Too many files open (" + maxFiles + "). A possible list: " + fileList);
} }
Wait.assertTrue("Too many open files", () -> getOpenFD() < maxFiles, 5000, 0);
} }
private String getOpenList() {
List<String> openFiles = getOpenFiles(true);
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter);
boolean first = true;
for (String str : openFiles) {
if (!first) printWriter.print("\n");
first = false;
printWriter.print(str);
}
return stringWriter.toString();
}
} }