mirror of https://github.com/apache/activemq.git
AMQ-7221 - Delete Scheduled messages causes ActiveMQ create/write a unnecessary huge transaction file
This commit is contained in:
parent
f5db964e86
commit
5eb8403b1f
|
@ -22,6 +22,8 @@ import org.apache.activemq.util.ByteSequence;
|
||||||
import org.apache.activemq.store.kahadb.disk.util.DataByteArrayInputStream;
|
import org.apache.activemq.store.kahadb.disk.util.DataByteArrayInputStream;
|
||||||
import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
|
import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
|
||||||
import org.apache.activemq.util.IOHelper;
|
import org.apache.activemq.util.IOHelper;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -34,6 +36,8 @@ import java.util.TreeMap;
|
||||||
*/
|
*/
|
||||||
public class Transaction implements Iterable<Page> {
|
public class Transaction implements Iterable<Page> {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(Transaction.class);
|
||||||
|
|
||||||
private RandomAccessFile tmpFile;
|
private RandomAccessFile tmpFile;
|
||||||
private File txFile;
|
private File txFile;
|
||||||
private long nextLocation = 0;
|
private long nextLocation = 0;
|
||||||
|
@ -656,6 +660,7 @@ public class Transaction implements Iterable<Page> {
|
||||||
public void commit() throws IOException {
|
public void commit() throws IOException {
|
||||||
if( writeTransactionId!=-1 ) {
|
if( writeTransactionId!=-1 ) {
|
||||||
if (tmpFile != null) {
|
if (tmpFile != null) {
|
||||||
|
LOG.debug("Committing transaction {}: Size {} kb", writeTransactionId, tmpFile.length() / (1024));
|
||||||
pageFile.removeTmpFile(getTempFile(), tmpFile);
|
pageFile.removeTmpFile(getTempFile(), tmpFile);
|
||||||
tmpFile = null;
|
tmpFile = null;
|
||||||
txFile = null;
|
txFile = null;
|
||||||
|
|
|
@ -20,6 +20,7 @@ import java.io.DataInput;
|
||||||
import java.io.DataOutput;
|
import java.io.DataOutput;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -578,6 +579,9 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
List<Integer> removedJobFileIds = new ArrayList<>();
|
||||||
|
HashMap<Integer, Integer> decrementJournalCount = new HashMap<>();
|
||||||
|
|
||||||
for (Long executionTime : keys) {
|
for (Long executionTime : keys) {
|
||||||
List<JobLocation> values = this.index.remove(tx, executionTime);
|
List<JobLocation> values = this.index.remove(tx, executionTime);
|
||||||
if (location != null) {
|
if (location != null) {
|
||||||
|
@ -586,9 +590,9 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
|
||||||
|
|
||||||
// Remove the references for add and reschedule commands for this job
|
// Remove the references for add and reschedule commands for this job
|
||||||
// so that those logs can be GC'd when free.
|
// so that those logs can be GC'd when free.
|
||||||
this.store.decrementJournalCount(tx, job.getLocation());
|
decrementJournalCount.compute(job.getLocation().getDataFileId(), (key, value) -> value == null ? 1 : value + 1);
|
||||||
if (job.getLastUpdate() != null) {
|
if (job.getLastUpdate() != null) {
|
||||||
this.store.decrementJournalCount(tx, job.getLastUpdate());
|
decrementJournalCount.compute(job.getLastUpdate().getDataFileId(), (key, value) -> value == null ? 1 : value + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// now that the job is removed from the index we can store the remove info and
|
// now that the job is removed from the index we can store the remove info and
|
||||||
|
@ -597,11 +601,19 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
|
||||||
// the same file we don't need to track it and just let a normal GC of the logs
|
// the same file we don't need to track it and just let a normal GC of the logs
|
||||||
// remove it when the log is unreferenced.
|
// remove it when the log is unreferenced.
|
||||||
if (job.getLocation().getDataFileId() != location.getDataFileId()) {
|
if (job.getLocation().getDataFileId() != location.getDataFileId()) {
|
||||||
this.store.referenceRemovedLocation(tx, location, job);
|
removedJobFileIds.add(job.getLocation().getDataFileId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!removedJobFileIds.isEmpty()) {
|
||||||
|
this.store.referenceRemovedLocation(tx, location, removedJobFileIds);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (decrementJournalCount.size() > 0) {
|
||||||
|
this.store.decrementJournalCount(tx, decrementJournalCount);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -492,6 +492,37 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes multiple references for the Journal log file indicated in the given Location map.
|
||||||
|
*
|
||||||
|
* The references are used to track which log files cannot be GC'd. When the reference count
|
||||||
|
* on a log file reaches zero the file id is removed from the tracker and the log will be
|
||||||
|
* removed on the next check point update.
|
||||||
|
*
|
||||||
|
* @param tx
|
||||||
|
* The TX under which the update is to be performed.
|
||||||
|
* @param decrementsByFileIds
|
||||||
|
* Map indicating how many decrements per fileId.
|
||||||
|
*
|
||||||
|
* @throws IOException if an error occurs while updating the journal references table.
|
||||||
|
*/
|
||||||
|
protected void decrementJournalCount(Transaction tx, HashMap<Integer, Integer> decrementsByFileIds) throws IOException {
|
||||||
|
for(Map.Entry<Integer, Integer> entry : decrementsByFileIds.entrySet()) {
|
||||||
|
int logId = entry.getKey();
|
||||||
|
Integer refCount = metaData.getJournalRC().get(tx, logId);
|
||||||
|
|
||||||
|
if (refCount != null) {
|
||||||
|
int refCountValue = refCount;
|
||||||
|
refCountValue -= entry.getValue();
|
||||||
|
if (refCountValue <= 0) {
|
||||||
|
metaData.getJournalRC().remove(tx, logId);
|
||||||
|
} else {
|
||||||
|
metaData.getJournalRC().put(tx, logId, refCountValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Updates the Job removal tracking index with the location of a remove command and the
|
* Updates the Job removal tracking index with the location of a remove command and the
|
||||||
* original JobLocation entry.
|
* original JobLocation entry.
|
||||||
|
@ -519,6 +550,33 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
|
||||||
this.metaData.getRemoveLocationTracker().put(tx, logId, removed);
|
this.metaData.getRemoveLocationTracker().put(tx, logId, removed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates the Job removal tracking index with the location of a remove command and the
|
||||||
|
* original JobLocation entry.
|
||||||
|
*
|
||||||
|
* The JobLocation holds the locations in the logs where the add and update commands for
|
||||||
|
* a job stored. The log file containing the remove command can only be discarded after
|
||||||
|
* both the add and latest update log files have also been discarded.
|
||||||
|
*
|
||||||
|
* @param tx
|
||||||
|
* The TX under which the update is to be performed.
|
||||||
|
* @param location
|
||||||
|
* The location value to reference a remove command.
|
||||||
|
* @param removedJobsFileId
|
||||||
|
* List of the original JobLocation instances that holds the add and update locations
|
||||||
|
*
|
||||||
|
* @throws IOException if an error occurs while updating the remove location tracker.
|
||||||
|
*/
|
||||||
|
protected void referenceRemovedLocation(Transaction tx, Location location, List<Integer> removedJobsFileId) throws IOException {
|
||||||
|
int logId = location.getDataFileId();
|
||||||
|
List<Integer> removed = this.metaData.getRemoveLocationTracker().get(tx, logId);
|
||||||
|
if (removed == null) {
|
||||||
|
removed = new ArrayList<Integer>();
|
||||||
|
}
|
||||||
|
removed.addAll(removedJobsFileId);
|
||||||
|
this.metaData.getRemoveLocationTracker().put(tx, logId, removed);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieve the scheduled Job's byte blob from the journal.
|
* Retrieve the scheduled Job's byte blob from the journal.
|
||||||
*
|
*
|
||||||
|
|
|
@ -23,6 +23,19 @@ import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import org.apache.activemq.ScheduledMessage;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
|
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
|
||||||
|
import org.apache.activemq.store.kahadb.disk.page.Transaction;
|
||||||
|
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
|
@ -44,13 +57,23 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
|
||||||
|
|
||||||
private static final transient Logger LOG = LoggerFactory.getLogger(JobSchedulerManagementTest.class);
|
private static final transient Logger LOG = LoggerFactory.getLogger(JobSchedulerManagementTest.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected BrokerService createBroker() throws Exception {
|
||||||
|
BrokerService brokerService = createBroker(true);
|
||||||
|
((JobSchedulerStoreImpl) brokerService.getJobSchedulerStore()).setCleanupInterval(500);
|
||||||
|
((JobSchedulerStoreImpl) brokerService.getJobSchedulerStore()).setJournalMaxFileLength(100* 1024);
|
||||||
|
return brokerService;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRemoveAllScheduled() throws Exception {
|
public void testRemoveAllScheduled() throws Exception {
|
||||||
final int COUNT = 5;
|
org.apache.log4j.Logger.getLogger(Transaction.class).setLevel(Level.DEBUG);
|
||||||
|
final int COUNT = 5000;
|
||||||
|
System.setProperty("maxKahaDBTxSize", "" + (500*1024));
|
||||||
Connection connection = createConnection();
|
Connection connection = createConnection();
|
||||||
|
|
||||||
// Setup the scheduled Message
|
// Setup the scheduled Message
|
||||||
scheduleMessage(connection, TimeUnit.SECONDS.toMillis(6), COUNT);
|
scheduleMessage(connection, TimeUnit.SECONDS.toMillis(180), COUNT);
|
||||||
|
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
@ -79,6 +102,7 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
|
||||||
// Now wait and see if any get delivered, none should.
|
// Now wait and see if any get delivered, none should.
|
||||||
latch.await(10, TimeUnit.SECONDS);
|
latch.await(10, TimeUnit.SECONDS);
|
||||||
assertEquals(latch.getCount(), COUNT);
|
assertEquals(latch.getCount(), COUNT);
|
||||||
|
assertEquals(1, getNumberOfJournalFiles());
|
||||||
|
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
@ -423,4 +447,15 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
|
||||||
|
|
||||||
producer.close();
|
producer.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int getNumberOfJournalFiles() throws IOException, InterruptedException {
|
||||||
|
Collection<DataFile> files = ((JobSchedulerStoreImpl) broker.getJobSchedulerStore()).getJournal().getFileMap().values();
|
||||||
|
int reality = 0;
|
||||||
|
for (DataFile file : files) {
|
||||||
|
if (file != null) {
|
||||||
|
reality++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return reality;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue