diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java index c20a98d4ad..e5845725ad 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java @@ -54,7 +54,7 @@ public class AMQ2736Test { KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); KahaDBStore store = pa.getStore(); - assertNotNull("last tx location is present " + store.getFirstInProgressTxLocation()); + assertNotNull("last tx location is present " + store.getInProgressTxLocationRange()[1]); // test hack, close the journal to ensure no further journal updates when broker stops // mimic kill -9 in terms of no normal shutdown sequence @@ -74,7 +74,7 @@ public class AMQ2736Test { store = pa.getStore(); // inflight non xa tx should be rolledback on recovery - assertNull("in progress tx location is present ", store.getFirstInProgressTxLocation()); + assertNull("in progress tx location is present ", store.getInProgressTxLocationRange()[0]); } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java new file mode 100644 index 0000000000..f2910a2745 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertTrue; + +public class TransactedStoreUsageSuspendResumeTest { + private static final Logger LOG = LoggerFactory.getLogger(TransactedStoreUsageSuspendResumeTest.class); + + private static final int MAX_MESSAGES = 10000; + + private static final String QUEUE_NAME = "test.queue"; + + private BrokerService broker; + + private final CountDownLatch messagesReceivedCountDown = new CountDownLatch(MAX_MESSAGES); + private final CountDownLatch messagesSentCountDown = new CountDownLatch(MAX_MESSAGES); + private final CountDownLatch consumerStartLatch = new CountDownLatch(1); + + private class ConsumerThread extends Thread { + + @Override + public void run() { + try { + + consumerStartLatch.await(30, TimeUnit.SECONDS); + + ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + // wait for producer to stop + long currentSendCount; + do { + currentSendCount = messagesSentCountDown.getCount(); + TimeUnit.SECONDS.sleep(5); + } while (currentSendCount != messagesSentCountDown.getCount()); + + LOG.info("Starting consumer at: " + currentSendCount); + + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); + + do { + Message message = consumer.receive(1000); + if (message != null) { + session.commit(); + messagesReceivedCountDown.countDown(); + } + if (messagesReceivedCountDown.getCount() % 500 == 0) { + LOG.info("remaining to receive: " + messagesReceivedCountDown.getCount()); + } + } while (messagesReceivedCountDown.getCount() != 0); + consumer.close(); + session.close(); + connection.close(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + } + + @Before + public void setup() throws Exception { + + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(true); + + KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter(); + kahaDB.setJournalMaxFileLength(500 * 1024); + kahaDB.setCleanupInterval(10*1000); + broker.setPersistenceAdapter(kahaDB); + + broker.getSystemUsage().getStoreUsage().setLimit(7*1024*1024); + + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + } + + @Test + public void testTransactedStoreUsageSuspendResume() throws Exception { + + ConsumerThread thread = new ConsumerThread(); + thread.start(); + ExecutorService sendExecutor = Executors.newSingleThreadExecutor(); + sendExecutor.execute(new Runnable() { + @Override + public void run() { + try { + sendMessages(); + } catch (Exception ignored) { + } + } + }); + sendExecutor.shutdown(); + sendExecutor.awaitTermination(5, TimeUnit.MINUTES); + + boolean allMessagesReceived = messagesReceivedCountDown.await(120, TimeUnit.SECONDS); + assertTrue("Got all messages: " + messagesReceivedCountDown, allMessagesReceived); + } + + private void sendMessages() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + factory.setAlwaysSyncSend(true); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination queue = session.createQueue(QUEUE_NAME); + Destination retainQueue = session.createQueue(QUEUE_NAME + "-retain"); + MessageProducer producer = session.createProducer(null); + + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[10]); + + for (int i=0; i<4240; i++) { + // mostly fill the store with retained messages + // so consumer only has a small bit of store usage to work with + producer.send(retainQueue, message); + session.commit(); + } + + consumerStartLatch.countDown(); + for (int i = 0; i < MAX_MESSAGES; i++) { + producer.send(queue, message); + if (i>0 && i%20 == 0) { + session.commit(); + } + messagesSentCountDown.countDown(); + if (i>0 && i%500 == 0) { + LOG.info("Sent : " + i); + } + + } + session.commit(); + producer.close(); + session.close(); + connection.close(); + } + +} diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index a27c45ff4a..e4cf059b1a 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -29,6 +29,7 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -421,7 +422,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe try { if( pageFile != null && pageFile.isLoaded() ) { metadata.state = CLOSED_STATE; - metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); + metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0]; if (metadata.page != null) { pageFile.tx().execute(new Transaction.Closure() { @@ -440,30 +441,36 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // public for testing @SuppressWarnings("rawtypes") - public Location getFirstInProgressTxLocation() { - Location l = null; + public Location[] getInProgressTxLocationRange() { + Location[] range = new Location[]{null, null}; synchronized (inflightTransactions) { if (!inflightTransactions.isEmpty()) { for (List ops : inflightTransactions.values()) { if (!ops.isEmpty()) { - l = ops.get(0).getLocation(); - break; + trackMaxAndMin(range, ops); } } } if (!preparedTransactions.isEmpty()) { for (List ops : preparedTransactions.values()) { if (!ops.isEmpty()) { - Location t = ops.get(0).getLocation(); - if (l==null || t.compareTo(l) <= 0) { - l = t; - } - break; + trackMaxAndMin(range, ops); } } } } - return l; + return range; + } + + private void trackMaxAndMin(Location[] range, List ops) { + Location t = ops.get(0).getLocation(); + if (range[0]==null || t.compareTo(range[0]) <= 0) { + range[0] = t; + } + t = ops.get(ops.size() -1).getLocation(); + if (range[1]==null || t.compareTo(range[1]) >= 0) { + range[1] = t; + } } class TranInfo { @@ -1385,11 +1392,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe LOG.debug("Checkpoint started."); // reflect last update exclusive of current checkpoint - Location firstTxLocation = metadata.lastUpdate; + Location lastUpdate = metadata.lastUpdate; metadata.state = OPEN_STATE; metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); - metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); + Location[] inProgressTxRange = getInProgressTxLocationRange(); + metadata.firstInProgressTransactionLocation = inProgressTxRange[0]; tx.store(metadata.page, metadataMarshaller, true); pageFile.flush(); @@ -1399,7 +1407,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe final TreeSet gcCandidateSet = new TreeSet(completeFileSet); if (LOG.isTraceEnabled()) { - LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet); + LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet); + } + + if (lastUpdate != null) { + gcCandidateSet.remove(lastUpdate.getDataFileId()); } // Don't GC files under replication @@ -1411,25 +1423,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe gcCandidateSet.remove(metadata.producerSequenceIdTrackerLocation.getDataFileId()); } - // Don't GC files after the first in progress tx - if( metadata.firstInProgressTransactionLocation!=null ) { - if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) { - firstTxLocation = metadata.firstInProgressTransactionLocation; + // Don't GC files referenced by in-progress tx + if (inProgressTxRange[0] != null) { + for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) { + gcCandidateSet.remove(pendingTx); } } - - if( firstTxLocation!=null ) { - while( !gcCandidateSet.isEmpty() ) { - Integer last = gcCandidateSet.last(); - if( last >= firstTxLocation.getDataFileId() ) { - gcCandidateSet.remove(last); - } else { - break; - } - } - if (LOG.isTraceEnabled()) { - LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet); - } + if (LOG.isTraceEnabled()) { + LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet); } // Go through all the destinations to see if any of them can remove GC candidates.