ARTEMIS-2837 Bursts of open files under high load
This would prevent to push a new opened file if there is already one available to be consumed
This commit is contained in:
parent
851aef1172
commit
a6bf7d0e04
|
@ -88,11 +88,14 @@ public class JournalFilesRepository {
|
||||||
private final Runnable pushOpenRunnable = new Runnable() {
|
private final Runnable pushOpenRunnable = new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
// if there's already an opened file there is no need to push a new one
|
||||||
pushOpenedFile();
|
if (openedFiles.isEmpty()) {
|
||||||
} catch (Exception e) {
|
try {
|
||||||
ActiveMQJournalLogger.LOGGER.errorPushingFile(e);
|
pushOpenedFile();
|
||||||
fileFactory.onIOError(e, "unable to open ", null);
|
} catch (Exception e) {
|
||||||
|
ActiveMQJournalLogger.LOGGER.errorPushingFile(e);
|
||||||
|
fileFactory.onIOError(e, "unable to open ", null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -444,11 +447,11 @@ public class JournalFilesRepository {
|
||||||
pushOpen();
|
pushOpen();
|
||||||
|
|
||||||
nextFile = openedFiles.poll(journalFileOpenTimeout, TimeUnit.SECONDS);
|
nextFile = openedFiles.poll(journalFileOpenTimeout, TimeUnit.SECONDS);
|
||||||
}
|
} else {
|
||||||
|
if (openedFiles.isEmpty()) {
|
||||||
if (openedFiles.isEmpty()) {
|
// if empty, push to open one.
|
||||||
// if empty, push to open one.
|
pushOpen();
|
||||||
pushOpen();
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nextFile == null) {
|
if (nextFile == null) {
|
||||||
|
|
|
@ -117,6 +117,31 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
|
||||||
stopJournal();
|
stopJournal();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFlushAppendsAndDeletes() throws Exception {
|
||||||
|
setup(10, 10 * 1024, true);
|
||||||
|
createJournal();
|
||||||
|
startJournal();
|
||||||
|
load();
|
||||||
|
byte[] record = new byte[1000];
|
||||||
|
for (int i = 0; i < record.length; i++) {
|
||||||
|
record[i] = (byte) 'a';
|
||||||
|
}
|
||||||
|
// Appending records after restart should be valid (not throwing any
|
||||||
|
// exceptions)
|
||||||
|
for (int i = 0; i < 10_000; i++) {
|
||||||
|
journal.appendAddRecord(i, (byte) 1, new SimpleEncoding(2, (byte) 'a'), false);
|
||||||
|
journal.appendDeleteRecord(i, false);
|
||||||
|
}
|
||||||
|
stopJournal();
|
||||||
|
|
||||||
|
List<String> files = fileFactory.listFiles(fileExtension);
|
||||||
|
|
||||||
|
// I am allowing one extra as a possible race with pushOpenFiles. I have not seen it happening on my test
|
||||||
|
// but it wouldn't be a problem if it happened
|
||||||
|
Assert.assertTrue("Supposed to have up to 10 files", files.size() <= 11);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testParams() throws Exception {
|
public void testParams() throws Exception {
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue