ARTEMIS-3513 Compacting exception invalidates deletes and updates
This commit is contained in:
parent
5ab8ed2803
commit
ef63dc95bb
|
@ -33,6 +33,10 @@ public interface SequentialFileFactory {
|
|||
|
||||
SequentialFile createSequentialFile(String fileName);
|
||||
|
||||
default SequentialFile createSequentialFile(String fileName, int capacity) {
|
||||
return createSequentialFile(fileName);
|
||||
}
|
||||
|
||||
int getMaxIO();
|
||||
|
||||
/**
|
||||
|
|
|
@ -30,9 +30,12 @@ import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
|
|||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.utils.PowerOf2Util;
|
||||
import org.apache.activemq.artemis.utils.Env;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
final class MappedFile implements AutoCloseable {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(MappedFile.class);
|
||||
|
||||
private static final int OS_PAGE_SIZE = Env.osPageSize();
|
||||
private final MappedByteBuffer buffer;
|
||||
private final FileChannel channel;
|
||||
|
@ -58,8 +61,10 @@ final class MappedFile implements AutoCloseable {
|
|||
final FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);
|
||||
length = (int) channel.size();
|
||||
if (length != capacity && length != 0) {
|
||||
channel.close();
|
||||
throw new IllegalStateException("the file is not " + capacity + " bytes long!");
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Adjusting capacity to " + length + " while it was " + capacity + " on file " + file);
|
||||
}
|
||||
capacity = length;
|
||||
}
|
||||
buffer = channel.map(FileChannel.MapMode.READ_WRITE, position, capacity);
|
||||
return new MappedFile(channel, buffer, 0, length);
|
||||
|
|
|
@ -62,6 +62,11 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac
|
|||
|
||||
@Override
|
||||
public SequentialFile createSequentialFile(String fileName) {
|
||||
return createSequentialFile(fileName, capacity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SequentialFile createSequentialFile(String fileName, int capacity) {
|
||||
final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, journalDir, new File(journalDir, fileName), capacity, critialErrorListener);
|
||||
if (this.timedBuffer == null) {
|
||||
return mappedSequentialFile;
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
|
||||
import org.apache.activemq.artemis.core.persistence.Persister;
|
||||
|
@ -125,6 +126,14 @@ public interface Journal extends ActiveMQComponent {
|
|||
|
||||
void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, boolean sync, boolean replaceableUpdate) throws Exception;
|
||||
|
||||
default IOCriticalErrorListener getCriticalErrorListener() {
|
||||
return null;
|
||||
}
|
||||
|
||||
default Journal setCriticalErrorListener(IOCriticalErrorListener criticalErrorListener) {
|
||||
return this;
|
||||
}
|
||||
|
||||
default void appendUpdateRecord(long id,
|
||||
byte recordType,
|
||||
EncodingSupport record,
|
||||
|
|
|
@ -87,70 +87,70 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
|
|||
final List<JournalFile> newFiles,
|
||||
final List<Pair<String, String>> renames) throws Exception {
|
||||
|
||||
SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
|
||||
ActiveMQBuffer filesToRename = ActiveMQBuffers.dynamicBuffer(1);
|
||||
|
||||
// DataFiles first
|
||||
|
||||
if (files == null) {
|
||||
filesToRename.writeInt(0);
|
||||
} else {
|
||||
filesToRename.writeInt(files.size());
|
||||
|
||||
for (JournalFile file : files) {
|
||||
filesToRename.writeUTF(file.getFile().getFileName());
|
||||
}
|
||||
}
|
||||
|
||||
// New Files second
|
||||
|
||||
if (newFiles == null) {
|
||||
filesToRename.writeInt(0);
|
||||
} else {
|
||||
filesToRename.writeInt(newFiles.size());
|
||||
|
||||
for (JournalFile file : newFiles) {
|
||||
filesToRename.writeUTF(file.getFile().getFileName());
|
||||
}
|
||||
}
|
||||
|
||||
// Renames from clean up third
|
||||
if (renames == null) {
|
||||
filesToRename.writeInt(0);
|
||||
} else {
|
||||
filesToRename.writeInt(renames.size());
|
||||
for (Pair<String, String> rename : renames) {
|
||||
filesToRename.writeUTF(rename.getA());
|
||||
filesToRename.writeUTF(rename.getB());
|
||||
}
|
||||
}
|
||||
|
||||
JournalInternalRecord controlRecord = new JournalAddRecord(true, 1, (byte) 0, EncoderPersister.getInstance(), new ByteArrayEncoding(filesToRename.toByteBuffer().array()));
|
||||
|
||||
ActiveMQBuffer renameBuffer = ActiveMQBuffers.dynamicBuffer(filesToRename.writerIndex());
|
||||
|
||||
controlRecord.setFileID(0);
|
||||
|
||||
controlRecord.encode(renameBuffer);
|
||||
|
||||
ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
|
||||
|
||||
writeBuffer.put(renameBuffer.toByteBuffer().array(), 0, renameBuffer.writerIndex());
|
||||
int position = writeBuffer.position();
|
||||
|
||||
writeBuffer.rewind();
|
||||
|
||||
|
||||
// the capacity here will only be applied to mapped files as they are created with the intended capacity and the control file needs to match the number of files needed
|
||||
SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL, position + 1024);
|
||||
try {
|
||||
controlFile.open(1, false);
|
||||
|
||||
JournalImpl.initFileHeader(fileFactory, controlFile, 0, 0);
|
||||
|
||||
ActiveMQBuffer filesToRename = ActiveMQBuffers.dynamicBuffer(1);
|
||||
|
||||
// DataFiles first
|
||||
|
||||
if (files == null) {
|
||||
filesToRename.writeInt(0);
|
||||
} else {
|
||||
filesToRename.writeInt(files.size());
|
||||
|
||||
for (JournalFile file : files) {
|
||||
filesToRename.writeUTF(file.getFile().getFileName());
|
||||
}
|
||||
}
|
||||
|
||||
// New Files second
|
||||
|
||||
if (newFiles == null) {
|
||||
filesToRename.writeInt(0);
|
||||
} else {
|
||||
filesToRename.writeInt(newFiles.size());
|
||||
|
||||
for (JournalFile file : newFiles) {
|
||||
filesToRename.writeUTF(file.getFile().getFileName());
|
||||
}
|
||||
}
|
||||
|
||||
// Renames from clean up third
|
||||
if (renames == null) {
|
||||
filesToRename.writeInt(0);
|
||||
} else {
|
||||
filesToRename.writeInt(renames.size());
|
||||
for (Pair<String, String> rename : renames) {
|
||||
filesToRename.writeUTF(rename.getA());
|
||||
filesToRename.writeUTF(rename.getB());
|
||||
}
|
||||
}
|
||||
|
||||
JournalInternalRecord controlRecord = new JournalAddRecord(true, 1, (byte) 0, EncoderPersister.getInstance(), new ByteArrayEncoding(filesToRename.toByteBuffer().array()));
|
||||
|
||||
ActiveMQBuffer renameBuffer = ActiveMQBuffers.dynamicBuffer(filesToRename.writerIndex());
|
||||
|
||||
controlRecord.setFileID(0);
|
||||
|
||||
controlRecord.encode(renameBuffer);
|
||||
|
||||
ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
|
||||
|
||||
writeBuffer.put(renameBuffer.toByteBuffer().array(), 0, renameBuffer.writerIndex());
|
||||
|
||||
writeBuffer.rewind();
|
||||
|
||||
controlFile.writeDirect(writeBuffer, true);
|
||||
|
||||
return controlFile;
|
||||
} finally {
|
||||
controlFile.close(false, false);
|
||||
}
|
||||
|
||||
return controlFile;
|
||||
}
|
||||
|
||||
static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
|
||||
|
|
|
@ -408,7 +408,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
|
|||
|
||||
if (pendingTransactions.get(transactionID) != null) {
|
||||
// Sanity check, this should never happen
|
||||
throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
|
||||
logger.debug("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
|
||||
" for an already rolled back transaction during compacting");
|
||||
} else {
|
||||
JournalTransaction newTransaction = newTransactions.remove(transactionID);
|
||||
|
|
|
@ -293,8 +293,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
// Compacting may replace this structure
|
||||
private final ConcurrentLongHashMap<JournalTransaction> transactions = new ConcurrentLongHashMap<>();
|
||||
|
||||
private final IOCriticalErrorListener criticalErrorListener;
|
||||
|
||||
private IOCriticalErrorListener criticalErrorListener;
|
||||
|
||||
// This will be set only while the JournalCompactor is being executed
|
||||
private volatile JournalCompactor compactor;
|
||||
|
@ -486,6 +485,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOCriticalErrorListener getCriticalErrorListener() {
|
||||
return criticalErrorListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JournalImpl setCriticalErrorListener(IOCriticalErrorListener criticalErrorListener) {
|
||||
this.criticalErrorListener = criticalErrorListener;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConcurrentLongHashMap<JournalRecord> getRecords() {
|
||||
return records;
|
||||
|
@ -1811,7 +1821,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
* stop, start records will still come as this is being executed
|
||||
*/
|
||||
|
||||
public synchronized void compact() throws Exception {
|
||||
public synchronized void compact() {
|
||||
|
||||
if (compactor != null) {
|
||||
throw new IllegalStateException("There is pending compacting operation");
|
||||
|
@ -1926,6 +1936,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
ActiveMQJournalLogger.LOGGER.compactMergeError(newTransaction.getId());
|
||||
}
|
||||
});
|
||||
} catch (Throwable e) {
|
||||
try {
|
||||
criticalIO(e);
|
||||
} catch (Throwable ignored) {
|
||||
logger.warn(ignored.getMessage(), ignored);
|
||||
}
|
||||
return;
|
||||
} finally {
|
||||
journalLock.writeLock().unlock();
|
||||
}
|
||||
|
@ -1934,27 +1951,22 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
|||
renameFiles(dataFilesToProcess, newDatafiles);
|
||||
deleteControlFile(controlFile);
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Flushing compacting on journal " + this);
|
||||
}
|
||||
|
||||
setAutoReclaim(previousReclaimValue);
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Finished compacting on journal " + this);
|
||||
}
|
||||
|
||||
} finally {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Flushing compacting on journal " + this);
|
||||
} catch (Throwable e) {
|
||||
try {
|
||||
criticalIO(e);
|
||||
} catch (Throwable ignored) {
|
||||
logger.warn(ignored.getMessage(), ignored);
|
||||
}
|
||||
// An Exception was probably thrown, and the compactor was not cleared
|
||||
if (compactor != null) {
|
||||
try {
|
||||
compactor.flush();
|
||||
} catch (Throwable ignored) {
|
||||
}
|
||||
|
||||
compactor = null;
|
||||
}
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("since compact finished, setAutoReclaim back into " + previousReclaimValue);
|
||||
}
|
||||
setAutoReclaim(previousReclaimValue);
|
||||
}
|
||||
} finally {
|
||||
compactorLock.writeLock().unlock();
|
||||
|
|
|
@ -275,15 +275,26 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
|||
public void handleDuplicateIds(Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
|
||||
for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet()) {
|
||||
SimpleString address = entry.getKey();
|
||||
|
||||
DuplicateIDCache cache = postOffice.getDuplicateIDCache(address);
|
||||
|
||||
if (configuration.isPersistIDCache()) {
|
||||
DuplicateIDCache cache = postOffice.getDuplicateIDCache(address);
|
||||
cache.load(entry.getValue());
|
||||
} else {
|
||||
removeOldDuplicates(entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void removeOldDuplicates(List<Pair<byte[], Long>> ids) throws Exception {
|
||||
ids.forEach((pair) -> {
|
||||
try {
|
||||
storageManager.deleteDuplicateID(pair.getB());
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postLoad(Journal messageJournal,
|
||||
ResourceManager resourceManager,
|
||||
|
|
|
@ -522,6 +522,12 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
|
|||
}
|
||||
};
|
||||
|
||||
AtomicInteger criticalErrors = new AtomicInteger(0);
|
||||
journal.setCriticalErrorListener((a, b, c) -> {
|
||||
System.out.println("Error");
|
||||
criticalErrors.incrementAndGet();
|
||||
});
|
||||
|
||||
journal.setAutoReclaim(false);
|
||||
|
||||
startJournal();
|
||||
|
@ -726,6 +732,12 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
|
|||
startJournal();
|
||||
loadAndCheck();
|
||||
|
||||
if (createControlFile) {
|
||||
Assert.assertEquals(0, criticalErrors.get());
|
||||
} else {
|
||||
Assert.assertEquals(1, criticalErrors.get());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1231,6 +1243,73 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
|
|||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testReconfigureJournalSize() throws Exception {
|
||||
setup(2, 60 * 1024, false);
|
||||
|
||||
createJournal();
|
||||
startJournal();
|
||||
load();
|
||||
|
||||
int NUMBER_OF_RECORDS = 1000;
|
||||
|
||||
// add and remove some data to force reclaiming
|
||||
{
|
||||
ArrayList<Long> ids = new ArrayList<>();
|
||||
for (int i = 0; i < NUMBER_OF_RECORDS; i++) {
|
||||
long id = idGenerator.generateID();
|
||||
ids.add(id);
|
||||
add(id);
|
||||
if (i > 0 && i % 100 == 0) {
|
||||
journal.forceMoveNextFile();
|
||||
}
|
||||
}
|
||||
|
||||
journal.forceMoveNextFile();
|
||||
|
||||
journal.checkReclaimStatus();
|
||||
}
|
||||
|
||||
journal.testCompact();
|
||||
|
||||
stopJournal();
|
||||
|
||||
// expanding the size once
|
||||
setup(2, 120 * 1024, false);
|
||||
createJournal();
|
||||
startJournal();
|
||||
loadAndCheck();
|
||||
|
||||
// add and remove some data to force reclaiming
|
||||
{
|
||||
ArrayList<Long> ids = new ArrayList<>();
|
||||
for (int i = 0; i < NUMBER_OF_RECORDS; i++) {
|
||||
long id = idGenerator.generateID();
|
||||
ids.add(id);
|
||||
add(id);
|
||||
if (i > 0 && i % 100 == 0) {
|
||||
journal.forceMoveNextFile();
|
||||
}
|
||||
}
|
||||
|
||||
journal.forceMoveNextFile();
|
||||
|
||||
journal.checkReclaimStatus();
|
||||
}
|
||||
stopJournal();
|
||||
|
||||
// shrinking the size later
|
||||
setup(2, 30 * 1024, false);
|
||||
createJournal();
|
||||
startJournal();
|
||||
loadAndCheck();
|
||||
|
||||
journal.testCompact();
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLiveSize() throws Exception {
|
||||
setup(2, 60 * 1024, true);
|
||||
|
@ -1853,6 +1932,29 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLargeCompacting() throws Exception {
|
||||
setup(2, 4 * 1024, false);
|
||||
|
||||
createJournal();
|
||||
|
||||
startJournal();
|
||||
|
||||
load();
|
||||
|
||||
|
||||
for (int i = 0; i < 250; i += 2) {
|
||||
addTx(i, i + 1);
|
||||
|
||||
commit(i);
|
||||
|
||||
journal.forceMoveNextFile();
|
||||
}
|
||||
|
||||
journal.testCompact();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
|
|
|
@ -0,0 +1,180 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.openwire;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
|
||||
import org.apache.activemq.artemis.core.server.JournalType;
|
||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class CompactingOpenWireTest extends BasicOpenWireTest {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(CompactingOpenWireTest.class);
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
realStore = true;
|
||||
super.setUp();
|
||||
System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", "2");
|
||||
createFactories();
|
||||
|
||||
for (int i = 0; i < 30; i++) {
|
||||
SimpleString coreQueue = new SimpleString(queueName + i);
|
||||
this.server.createQueue(new QueueConfiguration(coreQueue).setRoutingType(RoutingType.ANYCAST));
|
||||
testQueues.put(queueName, coreQueue);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getConnectionUrl() {
|
||||
return "failover:" + urlString;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void extraServerConfig(Configuration serverConfig) {
|
||||
super.extraServerConfig(serverConfig);
|
||||
serverConfig.setIDCacheSize(500);
|
||||
serverConfig.setPersistIDCache(true);
|
||||
serverConfig.setJournalSyncTransactional(false);
|
||||
serverConfig.setJournalSyncNonTransactional(false);
|
||||
serverConfig.setJournalFileSize(10 * 1024);
|
||||
serverConfig.setJournalCompactMinFiles(1);
|
||||
serverConfig.setJournalCompactPercentage(0);
|
||||
serverConfig.setJournalType(JournalType.MAPPED);
|
||||
serverConfig.setJournalBufferTimeout_NIO(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransactCompact() throws Exception {
|
||||
final int THREADS = 30;
|
||||
AtomicInteger errors = new AtomicInteger(0);
|
||||
AtomicBoolean running = new AtomicBoolean(true);
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(THREADS + 1);
|
||||
CountDownLatch compactDone = new CountDownLatch(1);
|
||||
executorService.execute(() -> {
|
||||
while (running.get()) {
|
||||
try {
|
||||
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
errors.incrementAndGet();
|
||||
}
|
||||
}
|
||||
compactDone.countDown();
|
||||
});
|
||||
CountDownLatch latchDone = new CountDownLatch(THREADS);
|
||||
AssertionLoggerHandler.startCapture();
|
||||
try {
|
||||
|
||||
String space1k = new String(new char[5]).replace('\0', ' ');
|
||||
for (int i = 0; i < THREADS; i++) {
|
||||
final int id = i % 10;
|
||||
executorService.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try (Connection connection = factory.createConnection()) {
|
||||
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
|
||||
Queue queue = session.createQueue(queueName + id);
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
MessageConsumer consumer = consumerSession.createConsumer(queue);
|
||||
connection.start();
|
||||
|
||||
for (int j = 0; j < 1000 && running.get(); j++) {
|
||||
TextMessage textMessage = session.createTextMessage("test");
|
||||
textMessage.setStringProperty("1k", space1k);
|
||||
producer.send(textMessage);
|
||||
if (j % 2 == 0) {
|
||||
session.commit();
|
||||
TextMessage message = (TextMessage) consumer.receive(5000);
|
||||
Assert.assertNotNull(message);
|
||||
|
||||
Assert.assertEquals("test", message.getText());
|
||||
|
||||
message.acknowledge();
|
||||
} else {
|
||||
session.rollback();
|
||||
}
|
||||
|
||||
}
|
||||
logger.debug("Done! ");
|
||||
|
||||
} catch (Throwable t) {
|
||||
errors.incrementAndGet();
|
||||
t.printStackTrace();
|
||||
} finally {
|
||||
latchDone.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
latchDone.await(10, TimeUnit.MINUTES);
|
||||
running.set(false);
|
||||
compactDone.await(10, TimeUnit.MINUTES);
|
||||
executorService.shutdownNow();
|
||||
Assert.assertEquals(0, errors.get());
|
||||
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ144003")); // error compacting
|
||||
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222055")); // records not found
|
||||
} finally {
|
||||
AssertionLoggerHandler.stopCapture();
|
||||
running.set(false);
|
||||
executorService.shutdownNow();
|
||||
}
|
||||
|
||||
connection.close();
|
||||
|
||||
server.stop();
|
||||
|
||||
server.getConfiguration().setPersistIDCache(false);
|
||||
server.getConfiguration().setJournalPoolFiles(2);
|
||||
|
||||
server.start();
|
||||
server.waitForActivation(1, TimeUnit.SECONDS);
|
||||
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(60_000);
|
||||
server.stop();
|
||||
|
||||
Map<Integer, AtomicInteger> counts = countJournal(server.getConfiguration());
|
||||
counts.forEach((a, b) -> System.out.println(a + " = " + b));
|
||||
AtomicInteger duplicateIDCounts = counts.get((int)JournalRecordIds.DUPLICATE_ID);
|
||||
Assert.assertTrue("There are duplicate IDs on the journal even though the system was reconfigured to not persist them::" + duplicateIDCounts, duplicateIDCounts == null || duplicateIDCounts.incrementAndGet() == 0);
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue