Revert "ARTEMIS-2837 Avoiding bursts on writes and pending callbacks"

This reverts commit 1761f763
This commit is contained in:
franz1981 2020-08-07 20:05:15 +02:00 committed by Clebert Suconic
parent 093cbe237f
commit 851aef1172
3 changed files with 13 additions and 43 deletions

View File

@ -444,11 +444,11 @@ public class JournalFilesRepository {
pushOpen();
nextFile = openedFiles.poll(journalFileOpenTimeout, TimeUnit.SECONDS);
} else {
if (openedFiles.isEmpty()) {
// if empty, push to open one.
pushOpen();
}
}
if (openedFiles.isEmpty()) {
// if empty, push to open one.
pushOpen();
}
if (nextFile == null) {

View File

@ -47,7 +47,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.DummyCallback;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
@ -841,7 +840,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize);
}
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(callback);
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
public void run() {
@ -933,7 +932,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
Object record,
boolean sync,
IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(callback);
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
public void run() {
@ -1017,7 +1016,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private void internalAppendDeleteRecord(long id,
boolean sync,
IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException {
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(callback);
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
public void run() {
@ -1064,12 +1063,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
result.get();
}
private static SimpleFuture newSyncAndCallbackResult(IOCompletion callback) {
if (callback != null && callback != DummyCallback.getInstance()) {
return SimpleFuture.dumb();
} else {
return new SimpleFutureImpl<>();
}
private static SimpleFuture newSyncAndCallbackResult(boolean sync, IOCompletion callback) {
return (sync && callback == null) ? new SimpleFutureImpl<>() : SimpleFuture.dumb();
}
@Override
@ -1295,7 +1290,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
logger.trace("scheduling appendPrepareRecord::txID=" + txID);
}
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(callback);
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
@ -1381,7 +1376,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(callback);
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
@ -1435,7 +1430,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(callback);
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
public void run() {

View File

@ -117,31 +117,6 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
stopJournal();
}
@Test
public void testFlushAppendsAndDeletes() throws Exception {
setup(10, 10 * 1024, true);
createJournal();
startJournal();
load();
byte[] record = new byte[1000];
for (int i = 0; i < record.length; i++) {
record[i] = (byte) 'a';
}
// Appending records after restart should be valid (not throwing any
// exceptions)
for (int i = 0; i < 10_000; i++) {
journal.appendAddRecord(i, (byte) 1, new SimpleEncoding(2, (byte) 'a'), false);
journal.appendDeleteRecord(i, false);
}
stopJournal();
List<String> files = fileFactory.listFiles(fileExtension);
// I am allowing one extra as a possible race with pushOpenFiles. I have not seen it happening on my test
// but it wouldn't be a problem if it happened
Assert.assertTrue("Supposed to have up to 10 files", files.size() <= 11);
}
@Test
public void testParams() throws Exception {
try {