From 5eb8403b1f77f1583942c59447b2514b4eda9b9c Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 3 Jun 2019 23:57:21 -0700 Subject: [PATCH] AMQ-7221 - Delete Scheduled messages causes ActiveMQ create/write a unnecessary huge transaction file --- .../store/kahadb/disk/page/Transaction.java | 5 ++ .../kahadb/scheduler/JobSchedulerImpl.java | 18 +++++- .../scheduler/JobSchedulerStoreImpl.java | 58 +++++++++++++++++++ .../scheduler/JobSchedulerManagementTest.java | 39 ++++++++++++- 4 files changed, 115 insertions(+), 5 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java index 52b2f99747..c91020c22d 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java @@ -22,6 +22,8 @@ import org.apache.activemq.util.ByteSequence; import org.apache.activemq.store.kahadb.disk.util.DataByteArrayInputStream; import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream; import org.apache.activemq.util.IOHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.*; import java.util.Iterator; @@ -34,6 +36,8 @@ import java.util.TreeMap; */ public class Transaction implements Iterable { + private static final Logger LOG = LoggerFactory.getLogger(Transaction.class); + private RandomAccessFile tmpFile; private File txFile; private long nextLocation = 0; @@ -656,6 +660,7 @@ public class Transaction implements Iterable { public void commit() throws IOException { if( writeTransactionId!=-1 ) { if (tmpFile != null) { + LOG.debug("Committing transaction {}: Size {} kb", writeTransactionId, tmpFile.length() / (1024)); pageFile.removeTmpFile(getTempFile(), tmpFile); tmpFile = null; txFile = null; diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java index 4cbcc30aed..5889c6ab95 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java @@ -20,6 +20,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -578,6 +579,9 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch } } + List removedJobFileIds = new ArrayList<>(); + HashMap decrementJournalCount = new HashMap<>(); + for (Long executionTime : keys) { List values = this.index.remove(tx, executionTime); if (location != null) { @@ -586,9 +590,9 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch // Remove the references for add and reschedule commands for this job // so that those logs can be GC'd when free. - this.store.decrementJournalCount(tx, job.getLocation()); + decrementJournalCount.compute(job.getLocation().getDataFileId(), (key, value) -> value == null ? 1 : value + 1); if (job.getLastUpdate() != null) { - this.store.decrementJournalCount(tx, job.getLastUpdate()); + decrementJournalCount.compute(job.getLastUpdate().getDataFileId(), (key, value) -> value == null ? 1 : value + 1); } // now that the job is removed from the index we can store the remove info and @@ -597,11 +601,19 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch // the same file we don't need to track it and just let a normal GC of the logs // remove it when the log is unreferenced. if (job.getLocation().getDataFileId() != location.getDataFileId()) { - this.store.referenceRemovedLocation(tx, location, job); + removedJobFileIds.add(job.getLocation().getDataFileId()); } } } } + + if (!removedJobFileIds.isEmpty()) { + this.store.referenceRemovedLocation(tx, location, removedJobFileIds); + } + + if (decrementJournalCount.size() > 0) { + this.store.decrementJournalCount(tx, decrementJournalCount); + } } /** diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java index 5cd46291a7..3ec3472e86 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java @@ -492,6 +492,37 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch } } + /** + * Removes multiple references for the Journal log file indicated in the given Location map. + * + * The references are used to track which log files cannot be GC'd. When the reference count + * on a log file reaches zero the file id is removed from the tracker and the log will be + * removed on the next check point update. + * + * @param tx + * The TX under which the update is to be performed. + * @param decrementsByFileIds + * Map indicating how many decrements per fileId. + * + * @throws IOException if an error occurs while updating the journal references table. + */ + protected void decrementJournalCount(Transaction tx, HashMap decrementsByFileIds) throws IOException { + for(Map.Entry entry : decrementsByFileIds.entrySet()) { + int logId = entry.getKey(); + Integer refCount = metaData.getJournalRC().get(tx, logId); + + if (refCount != null) { + int refCountValue = refCount; + refCountValue -= entry.getValue(); + if (refCountValue <= 0) { + metaData.getJournalRC().remove(tx, logId); + } else { + metaData.getJournalRC().put(tx, logId, refCountValue); + } + } + } + } + /** * Updates the Job removal tracking index with the location of a remove command and the * original JobLocation entry. @@ -519,6 +550,33 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch this.metaData.getRemoveLocationTracker().put(tx, logId, removed); } + /** + * Updates the Job removal tracking index with the location of a remove command and the + * original JobLocation entry. + * + * The JobLocation holds the locations in the logs where the add and update commands for + * a job stored. The log file containing the remove command can only be discarded after + * both the add and latest update log files have also been discarded. + * + * @param tx + * The TX under which the update is to be performed. + * @param location + * The location value to reference a remove command. + * @param removedJobsFileId + * List of the original JobLocation instances that holds the add and update locations + * + * @throws IOException if an error occurs while updating the remove location tracker. + */ + protected void referenceRemovedLocation(Transaction tx, Location location, List removedJobsFileId) throws IOException { + int logId = location.getDataFileId(); + List removed = this.metaData.getRemoveLocationTracker().get(tx, logId); + if (removed == null) { + removed = new ArrayList(); + } + removed.addAll(removedJobsFileId); + this.metaData.getRemoveLocationTracker().put(tx, logId, removed); + } + /** * Retrieve the scheduled Job's byte blob from the journal. * diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java index 6f6dc76b8f..5d80c0558b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java @@ -23,6 +23,19 @@ import static org.junit.Assert.fail; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.store.kahadb.disk.journal.DataFile; +import org.apache.activemq.store.kahadb.disk.page.Transaction; +import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; +import org.apache.log4j.Level; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.jms.Connection; import javax.jms.Destination; @@ -44,13 +57,23 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport { private static final transient Logger LOG = LoggerFactory.getLogger(JobSchedulerManagementTest.class); + @Override + protected BrokerService createBroker() throws Exception { + BrokerService brokerService = createBroker(true); + ((JobSchedulerStoreImpl) brokerService.getJobSchedulerStore()).setCleanupInterval(500); + ((JobSchedulerStoreImpl) brokerService.getJobSchedulerStore()).setJournalMaxFileLength(100* 1024); + return brokerService; + } + @Test public void testRemoveAllScheduled() throws Exception { - final int COUNT = 5; + org.apache.log4j.Logger.getLogger(Transaction.class).setLevel(Level.DEBUG); + final int COUNT = 5000; + System.setProperty("maxKahaDBTxSize", "" + (500*1024)); Connection connection = createConnection(); // Setup the scheduled Message - scheduleMessage(connection, TimeUnit.SECONDS.toMillis(6), COUNT); + scheduleMessage(connection, TimeUnit.SECONDS.toMillis(180), COUNT); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -79,6 +102,7 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport { // Now wait and see if any get delivered, none should. latch.await(10, TimeUnit.SECONDS); assertEquals(latch.getCount(), COUNT); + assertEquals(1, getNumberOfJournalFiles()); connection.close(); } @@ -423,4 +447,15 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport { producer.close(); } + + private int getNumberOfJournalFiles() throws IOException, InterruptedException { + Collection files = ((JobSchedulerStoreImpl) broker.getJobSchedulerStore()).getJournal().getFileMap().values(); + int reality = 0; + for (DataFile file : files) { + if (file != null) { + reality++; + } + } + return reality; + } }