AMQ-7067 - test and fix for eager ack compaction moving acks from data files with in progress tx and clearing the link from the prepare record, that now uses the same ack map

This commit is contained in:
gtully 2018-10-12 14:51:37 +01:00
parent 3ac3a420a1
commit 7c890d4776
2 changed files with 113 additions and 4 deletions

View File

@ -65,7 +65,6 @@ import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
@ -2037,7 +2036,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
// Check if we found one, or if we only found the current file being written to.
if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
if (journalToAdvance == -1 || blockedFromCompaction(journalToAdvance)) {
return;
}
@ -2077,8 +2076,30 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
// called with the index lock held
private boolean blockedFromCompaction(int journalToAdvance) {
// don't forward the current data file
if (journalToAdvance == journal.getCurrentDataFileId()) {
return true;
}
// don't forward any data file with inflight transaction records because it will whack the tx - data file link
// in the ack map when all acks are migrated (now that the ack map is not just for acks)
// TODO: prepare records can be dropped but completion records (maybe only commit outcomes) need to be migrated
// as part of the forward work.
Location[] inProgressTxRange = getInProgressTxLocationRange();
if (inProgressTxRange[0] != null) {
for (int pendingTx = inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
if (journalToAdvance == pendingTx) {
LOG.trace("Compaction target:{} blocked by inflight transaction records: {}", journalToAdvance, inProgressTxRange);
return true;
}
}
}
return false;
}
private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException {
LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead);
LOG.trace("Attempting to move all acks in journal:{} to the front. Referenced files:{}", journalToRead, journalLogsReferenced);
DataFile forwardsFile = journal.reserveDataFile();
forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE);

View File

@ -13,9 +13,12 @@ import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MessageDatabase;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.Wait;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -114,6 +117,82 @@ public class AMQ7067Test {
((org.apache.activemq.broker.region.Queue) broker.getRegionBroker().getDestinationMap().get(queue)).purge();
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == getQueueSize(queue.getQueueName());
}
});
// force gc
broker.getPersistenceAdapter().checkpoint(true);
Xid[] xids = xaRes.recover(TMSTARTRSCAN);
//Should be 1 since we have only 1 prepared
assertEquals(1, xids.length);
connection.close();
broker.stop();
broker.waitUntilStopped();
createBroker();
setupXAConnection();
xids = xaRes.recover(TMSTARTRSCAN);
System.out.println("****** recovered = " + xids);
// THIS SHOULD NOT FAIL AS THERE SHOULD DBE ONLY 1 TRANSACTION!
assertEquals(1, xids.length);
}
@Test
public void testXAPrepareWithAckCompactionDoesNotLooseInflight() throws Exception {
// investigate liner gc issue - store usage not getting released
org.apache.log4j.Logger.getLogger(MessageDatabase.class).setLevel(Level.TRACE);
setupXAConnection();
Queue holdKahaDb = xaSession.createQueue("holdKahaDb");
MessageProducer holdKahaDbProducer = xaSession.createProducer(holdKahaDb);
XATransactionId txid = createXATransaction();
System.out.println("****** create new txid = " + txid);
xaRes.start(txid, TMNOFLAGS);
TextMessage helloMessage = xaSession.createTextMessage(StringUtils.repeat("a", 10));
holdKahaDbProducer.send(helloMessage);
xaRes.end(txid, TMSUCCESS);
Queue queue = xaSession.createQueue("test");
produce(xaRes, xaSession, queue, 100, 512 * 1024);
xaRes.prepare(txid);
produce(xaRes, xaSession, queue, 100, 512 * 1024);
((org.apache.activemq.broker.region.Queue) broker.getRegionBroker().getDestinationMap().get(queue)).purge();
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == getQueueSize(queue.getQueueName());
}
});
// force gc, two data files requires two cycles
int limit = ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).getCompactAcksAfterNoGC() + 1;
for (int i=0; i<limit*2; i++) {
broker.getPersistenceAdapter().checkpoint(true);
}
// ack compaction task operates in the background
TimeUnit.SECONDS.sleep(5);
Xid[] xids = xaRes.recover(TMSTARTRSCAN);
//Should be 1 since we have only 1 prepared
@ -160,6 +239,16 @@ public class AMQ7067Test {
((org.apache.activemq.broker.region.Queue) broker.getRegionBroker().getDestinationMap().get(queue)).purge();
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == getQueueSize(queue.getQueueName());
}
});
// force gc
broker.getPersistenceAdapter().checkpoint(true);
Xid[] xids = xaRes.recover(TMSTARTRSCAN);
//Should be 1 since we have only 1 prepared
@ -343,7 +432,6 @@ public class AMQ7067Test {
TextMessage helloMessage = xaSession.createTextMessage(StringUtils.repeat("a", messageSize));
producer.send(helloMessage);
xaRes.end(txid, TMSUCCESS);
xaRes.prepare(txid);
xaRes.commit(txid, true);
}
}