This commit is contained in:
Clebert Suconic 2021-10-05 12:35:47 -04:00
commit 2557f80897
11 changed files with 416 additions and 82 deletions

View File

@ -33,6 +33,10 @@ public interface SequentialFileFactory {
SequentialFile createSequentialFile(String fileName);
default SequentialFile createSequentialFile(String fileName, int capacity) {
return createSequentialFile(fileName);
}
int getMaxIO();
/**

View File

@ -30,9 +30,12 @@ import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.Env;
import org.jboss.logging.Logger;
final class MappedFile implements AutoCloseable {
private static final Logger logger = Logger.getLogger(MappedFile.class);
private static final int OS_PAGE_SIZE = Env.osPageSize();
private final MappedByteBuffer buffer;
private final FileChannel channel;
@ -58,8 +61,10 @@ final class MappedFile implements AutoCloseable {
final FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);
length = (int) channel.size();
if (length != capacity && length != 0) {
channel.close();
throw new IllegalStateException("the file is not " + capacity + " bytes long!");
if (logger.isDebugEnabled()) {
logger.debug("Adjusting capacity to " + length + " while it was " + capacity + " on file " + file);
}
capacity = length;
}
buffer = channel.map(FileChannel.MapMode.READ_WRITE, position, capacity);
return new MappedFile(channel, buffer, 0, length);

View File

@ -62,6 +62,11 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac
@Override
public SequentialFile createSequentialFile(String fileName) {
return createSequentialFile(fileName, capacity);
}
@Override
public SequentialFile createSequentialFile(String fileName, int capacity) {
final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, journalDir, new File(journalDir, fileName), capacity, critialErrorListener);
if (this.timedBuffer == null) {
return mappedSequentialFile;

View File

@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.persistence.Persister;
@ -125,6 +126,14 @@ public interface Journal extends ActiveMQComponent {
void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, boolean sync, boolean replaceableUpdate) throws Exception;
default IOCriticalErrorListener getCriticalErrorListener() {
return null;
}
default Journal setCriticalErrorListener(IOCriticalErrorListener criticalErrorListener) {
return this;
}
default void appendUpdateRecord(long id,
byte recordType,
EncodingSupport record,

View File

@ -87,13 +87,6 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
final List<JournalFile> newFiles,
final List<Pair<String, String>> renames) throws Exception {
SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
try {
controlFile.open(1, false);
JournalImpl.initFileHeader(fileFactory, controlFile, 0, 0);
ActiveMQBuffer filesToRename = ActiveMQBuffers.dynamicBuffer(1);
// DataFiles first
@ -142,15 +135,22 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
writeBuffer.put(renameBuffer.toByteBuffer().array(), 0, renameBuffer.writerIndex());
int position = writeBuffer.position();
writeBuffer.rewind();
controlFile.writeDirect(writeBuffer, true);
return controlFile;
// the capacity here will only be applied to mapped files as they are created with the intended capacity and the control file needs to match the number of files needed
SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL, position + 1024);
try {
controlFile.open(1, false);
JournalImpl.initFileHeader(fileFactory, controlFile, 0, 0);
controlFile.writeDirect(writeBuffer, true);
} finally {
controlFile.close(false, false);
}
return controlFile;
}
static SequentialFile readControlFile(final SequentialFileFactory fileFactory,

View File

@ -408,7 +408,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
if (pendingTransactions.get(transactionID) != null) {
// Sanity check, this should never happen
throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
logger.debug("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
" for an already rolled back transaction during compacting");
} else {
JournalTransaction newTransaction = newTransactions.remove(transactionID);

View File

@ -293,8 +293,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// Compacting may replace this structure
private final ConcurrentLongHashMap<JournalTransaction> transactions = new ConcurrentLongHashMap<>();
private final IOCriticalErrorListener criticalErrorListener;
private IOCriticalErrorListener criticalErrorListener;
// This will be set only while the JournalCompactor is being executed
private volatile JournalCompactor compactor;
@ -486,6 +485,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
@Override
public IOCriticalErrorListener getCriticalErrorListener() {
return criticalErrorListener;
}
@Override
public JournalImpl setCriticalErrorListener(IOCriticalErrorListener criticalErrorListener) {
this.criticalErrorListener = criticalErrorListener;
return this;
}
@Override
public ConcurrentLongHashMap<JournalRecord> getRecords() {
return records;
@ -1811,7 +1821,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
* stop, start records will still come as this is being executed
*/
public synchronized void compact() throws Exception {
public synchronized void compact() {
if (compactor != null) {
throw new IllegalStateException("There is pending compacting operation");
@ -1926,6 +1936,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
ActiveMQJournalLogger.LOGGER.compactMergeError(newTransaction.getId());
}
});
} catch (Throwable e) {
try {
criticalIO(e);
} catch (Throwable ignored) {
logger.warn(ignored.getMessage(), ignored);
}
return;
} finally {
journalLock.writeLock().unlock();
}
@ -1934,27 +1951,22 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
renameFiles(dataFilesToProcess, newDatafiles);
deleteControlFile(controlFile);
if (logger.isDebugEnabled()) {
logger.debug("Flushing compacting on journal " + this);
}
setAutoReclaim(previousReclaimValue);
if (logger.isDebugEnabled()) {
logger.debug("Finished compacting on journal " + this);
}
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Flushing compacting on journal " + this);
}
// An Exception was probably thrown, and the compactor was not cleared
if (compactor != null) {
} catch (Throwable e) {
try {
compactor.flush();
criticalIO(e);
} catch (Throwable ignored) {
logger.warn(ignored.getMessage(), ignored);
}
compactor = null;
}
if (logger.isDebugEnabled()) {
logger.debug("since compact finished, setAutoReclaim back into " + previousReclaimValue);
}
setAutoReclaim(previousReclaimValue);
}
} finally {
compactorLock.writeLock().unlock();

View File

@ -710,7 +710,12 @@ public final class OpenWireMessageConverter {
private static <T> T getObjectProperty(ICoreMessage message, Class<T> type, SimpleString property) {
if (message.getPropertyNames().contains(property)) {
try {
return type.cast(message.getObjectProperty(property));
Object value = message.getObjectProperty(property);
if (type == String.class && value != null) {
return (T)value.toString();
} else {
return type.cast(value);
}
} catch (ClassCastException e) {
ActiveMQServerLogger.LOGGER.failedToDealWithObjectProperty(property, e.getMessage());
}

View File

@ -275,15 +275,26 @@ public class PostOfficeJournalLoader implements JournalLoader {
public void handleDuplicateIds(Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet()) {
SimpleString address = entry.getKey();
DuplicateIDCache cache = postOffice.getDuplicateIDCache(address);
if (configuration.isPersistIDCache()) {
DuplicateIDCache cache = postOffice.getDuplicateIDCache(address);
cache.load(entry.getValue());
} else {
removeOldDuplicates(entry.getValue());
}
}
}
private void removeOldDuplicates(List<Pair<byte[], Long>> ids) throws Exception {
ids.forEach((pair) -> {
try {
storageManager.deleteDuplicateID(pair.getB());
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
});
}
@Override
public void postLoad(Journal messageJournal,
ResourceManager resourceManager,

View File

@ -522,6 +522,12 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
}
};
AtomicInteger criticalErrors = new AtomicInteger(0);
journal.setCriticalErrorListener((a, b, c) -> {
System.out.println("Error");
criticalErrors.incrementAndGet();
});
journal.setAutoReclaim(false);
startJournal();
@ -726,6 +732,12 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
startJournal();
loadAndCheck();
if (createControlFile) {
Assert.assertEquals(0, criticalErrors.get());
} else {
Assert.assertEquals(1, criticalErrors.get());
}
}
@Test
@ -1231,6 +1243,73 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
}
@Test
public void testReconfigureJournalSize() throws Exception {
setup(2, 60 * 1024, false);
createJournal();
startJournal();
load();
int NUMBER_OF_RECORDS = 1000;
// add and remove some data to force reclaiming
{
ArrayList<Long> ids = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_RECORDS; i++) {
long id = idGenerator.generateID();
ids.add(id);
add(id);
if (i > 0 && i % 100 == 0) {
journal.forceMoveNextFile();
}
}
journal.forceMoveNextFile();
journal.checkReclaimStatus();
}
journal.testCompact();
stopJournal();
// expanding the size once
setup(2, 120 * 1024, false);
createJournal();
startJournal();
loadAndCheck();
// add and remove some data to force reclaiming
{
ArrayList<Long> ids = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_RECORDS; i++) {
long id = idGenerator.generateID();
ids.add(id);
add(id);
if (i > 0 && i % 100 == 0) {
journal.forceMoveNextFile();
}
}
journal.forceMoveNextFile();
journal.checkReclaimStatus();
}
stopJournal();
// shrinking the size later
setup(2, 30 * 1024, false);
createJournal();
startJournal();
loadAndCheck();
journal.testCompact();
}
@Test
public void testLiveSize() throws Exception {
setup(2, 60 * 1024, true);
@ -1853,6 +1932,29 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
}
@Test
public void testLargeCompacting() throws Exception {
setup(2, 4 * 1024, false);
createJournal();
startJournal();
load();
for (int i = 0; i < 250; i += 2) {
addTx(i, i + 1);
commit(i);
journal.forceMoveNextFile();
}
journal.testCompact();
}
@Override
@After
public void tearDown() throws Exception {

View File

@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.openwire;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class CompactingOpenWireTest extends BasicOpenWireTest {
private static final Logger logger = Logger.getLogger(CompactingOpenWireTest.class);
@Override
@Before
public void setUp() throws Exception {
realStore = true;
super.setUp();
System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", "2");
createFactories();
for (int i = 0; i < 30; i++) {
SimpleString coreQueue = new SimpleString(queueName + i);
this.server.createQueue(new QueueConfiguration(coreQueue).setRoutingType(RoutingType.ANYCAST));
testQueues.put(queueName, coreQueue);
}
}
@Override
protected String getConnectionUrl() {
return "failover:" + urlString;
}
@Override
protected void extraServerConfig(Configuration serverConfig) {
super.extraServerConfig(serverConfig);
serverConfig.setIDCacheSize(500);
serverConfig.setPersistIDCache(true);
serverConfig.setJournalSyncTransactional(false);
serverConfig.setJournalSyncNonTransactional(false);
serverConfig.setJournalFileSize(10 * 1024);
serverConfig.setJournalCompactMinFiles(1);
serverConfig.setJournalCompactPercentage(0);
serverConfig.setJournalType(JournalType.MAPPED);
serverConfig.setJournalBufferTimeout_NIO(0);
}
@Test
public void testTransactCompact() throws Exception {
final int THREADS = 30;
AtomicInteger errors = new AtomicInteger(0);
AtomicBoolean running = new AtomicBoolean(true);
ExecutorService executorService = Executors.newFixedThreadPool(THREADS + 1);
CountDownLatch compactDone = new CountDownLatch(1);
executorService.execute(() -> {
while (running.get()) {
try {
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
} catch (Exception e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
compactDone.countDown();
});
CountDownLatch latchDone = new CountDownLatch(THREADS);
AssertionLoggerHandler.startCapture();
try {
String space1k = new String(new char[5]).replace('\0', ' ');
for (int i = 0; i < THREADS; i++) {
final int id = i % 10;
executorService.submit(new Runnable() {
@Override
public void run() {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName + id);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = consumerSession.createConsumer(queue);
connection.start();
for (int j = 0; j < 1000 && running.get(); j++) {
TextMessage textMessage = session.createTextMessage("test");
textMessage.setStringProperty("1k", space1k);
producer.send(textMessage);
if (j % 2 == 0) {
session.commit();
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("test", message.getText());
message.acknowledge();
} else {
session.rollback();
}
}
logger.debug("Done! ");
} catch (Throwable t) {
errors.incrementAndGet();
t.printStackTrace();
} finally {
latchDone.countDown();
}
}
});
}
latchDone.await(10, TimeUnit.MINUTES);
running.set(false);
compactDone.await(10, TimeUnit.MINUTES);
executorService.shutdownNow();
Assert.assertEquals(0, errors.get());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ144003")); // error compacting
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222055")); // records not found
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222302")); // string conversion issue
} finally {
AssertionLoggerHandler.stopCapture();
running.set(false);
executorService.shutdownNow();
}
connection.close();
server.stop();
server.getConfiguration().setPersistIDCache(false);
server.getConfiguration().setJournalPoolFiles(2);
server.start();
server.waitForActivation(1, TimeUnit.SECONDS);
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(60_000);
server.stop();
Map<Integer, AtomicInteger> counts = countJournal(server.getConfiguration());
counts.forEach((a, b) -> System.out.println(a + " = " + b));
AtomicInteger duplicateIDCounts = counts.get((int)JournalRecordIds.DUPLICATE_ID);
Assert.assertTrue("There are duplicate IDs on the journal even though the system was reconfigured to not persist them::" + duplicateIDCounts, duplicateIDCounts == null || duplicateIDCounts.incrementAndGet() == 0);
}
}