ARTEMIS-2837 Avoiding bursts on writes and pending callbacks
This commit is contained in:
parent
fbeb9a492c
commit
1761f76308
|
@ -444,11 +444,11 @@ public class JournalFilesRepository {
|
||||||
pushOpen();
|
pushOpen();
|
||||||
|
|
||||||
nextFile = openedFiles.poll(journalFileOpenTimeout, TimeUnit.SECONDS);
|
nextFile = openedFiles.poll(journalFileOpenTimeout, TimeUnit.SECONDS);
|
||||||
}
|
} else {
|
||||||
|
if (openedFiles.isEmpty()) {
|
||||||
if (openedFiles.isEmpty()) {
|
// if empty, push to open one.
|
||||||
// if empty, push to open one.
|
pushOpen();
|
||||||
pushOpen();
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nextFile == null) {
|
if (nextFile == null) {
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
|
import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
|
||||||
import org.apache.activemq.artemis.api.core.Pair;
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
|
import org.apache.activemq.artemis.core.io.DummyCallback;
|
||||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
|
@ -840,7 +841,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize);
|
throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
|
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(callback);
|
||||||
appendExecutor.execute(new Runnable() {
|
appendExecutor.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -932,7 +933,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
Object record,
|
Object record,
|
||||||
boolean sync,
|
boolean sync,
|
||||||
IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
|
IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
|
||||||
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
|
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(callback);
|
||||||
appendExecutor.execute(new Runnable() {
|
appendExecutor.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -1016,7 +1017,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
private void internalAppendDeleteRecord(long id,
|
private void internalAppendDeleteRecord(long id,
|
||||||
boolean sync,
|
boolean sync,
|
||||||
IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
|
IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
|
||||||
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
|
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(callback);
|
||||||
appendExecutor.execute(new Runnable() {
|
appendExecutor.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -1063,8 +1064,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
result.get();
|
result.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static SimpleFuture newSyncAndCallbackResult(boolean sync, IOCompletion callback) {
|
private static SimpleFuture newSyncAndCallbackResult(IOCompletion callback) {
|
||||||
return (sync && callback == null) ? new SimpleFutureImpl<>() : SimpleFuture.dumb();
|
if (callback != null && callback != DummyCallback.getInstance()) {
|
||||||
|
return SimpleFuture.dumb();
|
||||||
|
} else {
|
||||||
|
return new SimpleFutureImpl<>();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1290,7 +1295,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
logger.trace("scheduling appendPrepareRecord::txID=" + txID);
|
logger.trace("scheduling appendPrepareRecord::txID=" + txID);
|
||||||
}
|
}
|
||||||
|
|
||||||
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
|
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(callback);
|
||||||
|
|
||||||
appendExecutor.execute(new Runnable() {
|
appendExecutor.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -1376,7 +1381,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
|
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(callback);
|
||||||
|
|
||||||
appendExecutor.execute(new Runnable() {
|
appendExecutor.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -1430,7 +1435,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
|
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(callback);
|
||||||
appendExecutor.execute(new Runnable() {
|
appendExecutor.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
|
|
@ -117,6 +117,31 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
|
||||||
stopJournal();
|
stopJournal();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFlushAppendsAndDeletes() throws Exception {
|
||||||
|
setup(10, 10 * 1024, true);
|
||||||
|
createJournal();
|
||||||
|
startJournal();
|
||||||
|
load();
|
||||||
|
byte[] record = new byte[1000];
|
||||||
|
for (int i = 0; i < record.length; i++) {
|
||||||
|
record[i] = (byte) 'a';
|
||||||
|
}
|
||||||
|
// Appending records after restart should be valid (not throwing any
|
||||||
|
// exceptions)
|
||||||
|
for (int i = 0; i < 10_000; i++) {
|
||||||
|
journal.appendAddRecord(i, (byte) 1, new SimpleEncoding(2, (byte) 'a'), false);
|
||||||
|
journal.appendDeleteRecord(i, false);
|
||||||
|
}
|
||||||
|
stopJournal();
|
||||||
|
|
||||||
|
List<String> files = fileFactory.listFiles(fileExtension);
|
||||||
|
|
||||||
|
// I am allowing one extra as a possible race with pushOpenFiles. I have not seen it happening on my test
|
||||||
|
// but it wouldn't be a problem if it happened
|
||||||
|
Assert.assertTrue("Supposed to have up to 10 files", files.size() <= 11);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testParams() throws Exception {
|
public void testParams() throws Exception {
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue