This commit is contained in:
Clebert Suconic 2020-07-09 08:17:54 -04:00
commit eb54564e52
3 changed files with 43 additions and 13 deletions

View File

@ -444,12 +444,12 @@ public class JournalFilesRepository {
pushOpen(); pushOpen();
nextFile = openedFiles.poll(journalFileOpenTimeout, TimeUnit.SECONDS); nextFile = openedFiles.poll(journalFileOpenTimeout, TimeUnit.SECONDS);
} } else {
if (openedFiles.isEmpty()) { if (openedFiles.isEmpty()) {
// if empty, push to open one. // if empty, push to open one.
pushOpen(); pushOpen();
} }
}
if (nextFile == null) { if (nextFile == null) {

View File

@ -47,6 +47,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQShutdownException; import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.DummyCallback;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
@ -840,7 +841,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize); throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize);
} }
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback); final SimpleFuture<Boolean> result = newSyncAndCallbackResult(callback);
appendExecutor.execute(new Runnable() { appendExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -932,7 +933,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
Object record, Object record,
boolean sync, boolean sync,
IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException { IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback); final SimpleFuture<Boolean> result = newSyncAndCallbackResult(callback);
appendExecutor.execute(new Runnable() { appendExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -1016,7 +1017,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private void internalAppendDeleteRecord(long id, private void internalAppendDeleteRecord(long id,
boolean sync, boolean sync,
IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException { IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback); final SimpleFuture<Boolean> result = newSyncAndCallbackResult(callback);
appendExecutor.execute(new Runnable() { appendExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -1063,8 +1064,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
result.get(); result.get();
} }
private static SimpleFuture newSyncAndCallbackResult(boolean sync, IOCompletion callback) { private static SimpleFuture newSyncAndCallbackResult(IOCompletion callback) {
return (sync && callback == null) ? new SimpleFutureImpl<>() : SimpleFuture.dumb(); if (callback != null && callback != DummyCallback.getInstance()) {
return SimpleFuture.dumb();
} else {
return new SimpleFutureImpl<>();
}
} }
@Override @Override
@ -1290,7 +1295,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
logger.trace("scheduling appendPrepareRecord::txID=" + txID); logger.trace("scheduling appendPrepareRecord::txID=" + txID);
} }
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback); final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(callback);
appendExecutor.execute(new Runnable() { appendExecutor.execute(new Runnable() {
@Override @Override
@ -1376,7 +1381,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} }
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback); final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(callback);
appendExecutor.execute(new Runnable() { appendExecutor.execute(new Runnable() {
@Override @Override
@ -1430,7 +1435,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback); final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(callback);
appendExecutor.execute(new Runnable() { appendExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {

View File

@ -117,6 +117,31 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
stopJournal(); stopJournal();
} }
@Test
public void testFlushAppendsAndDeletes() throws Exception {
setup(10, 10 * 1024, true);
createJournal();
startJournal();
load();
byte[] record = new byte[1000];
for (int i = 0; i < record.length; i++) {
record[i] = (byte) 'a';
}
// Appending records after restart should be valid (not throwing any
// exceptions)
for (int i = 0; i < 10_000; i++) {
journal.appendAddRecord(i, (byte) 1, new SimpleEncoding(2, (byte) 'a'), false);
journal.appendDeleteRecord(i, false);
}
stopJournal();
List<String> files = fileFactory.listFiles(fileExtension);
// I am allowing one extra as a possible race with pushOpenFiles. I have not seen it happening on my test
// but it wouldn't be a problem if it happened
Assert.assertTrue("Supposed to have up to 10 files", files.size() <= 11);
}
@Test @Test
public void testParams() throws Exception { public void testParams() throws Exception {
try { try {