diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 6790383378..2db07f1d61 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1752,7 +1752,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } gcCandidateSet.remove(dataFileId); if (LOG.isTraceEnabled()) { - LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + dataFileId + ", " + gcCandidateSet); + LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + metadata.producerSequenceIdTrackerLocation + ", " + gcCandidateSet); } } @@ -1760,7 +1760,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId(); gcCandidateSet.remove(dataFileId); if (LOG.isTraceEnabled()) { - LOG.trace("gc candidates after ackMessageFileMapLocation:" + dataFileId + ", " + gcCandidateSet); + LOG.trace("gc candidates after ackMessageFileMapLocation:" + metadata.ackMessageFileMapLocation + ", " + gcCandidateSet); } } @@ -1771,7 +1771,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } if (LOG.isTraceEnabled()) { - LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet); + LOG.trace("gc candidates after in progress tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet); } // Go through all the destinations to see if any of them can remove GC candidates. @@ -2038,7 +2038,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe DataFile forwardsFile = journal.reserveDataFile(); forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE); - LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile); + LOG.trace("Reserved file for forwarded acks: {}", forwardsFile); Map> updatedAckLocations = new HashMap<>(); @@ -2051,8 +2051,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe appender.storeItem(payload, Journal.USER_RECORD_TYPE, false); LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead); - Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0)); - while (nextLocation != null && nextLocation.getDataFileId() == journalToRead) { + final Location limit = new Location(journalToRead + 1, 0); + Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0), limit); + while (nextLocation != null) { JournalCommand command = null; try { command = load(nextLocation); @@ -2066,7 +2067,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced); } - nextLocation = getNextLocationForAckForward(nextLocation); + nextLocation = getNextLocationForAckForward(nextLocation, limit); } } @@ -2096,13 +2097,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap); } - private Location getNextLocationForAckForward(final Location nextLocation) { + private Location getNextLocationForAckForward(final Location nextLocation, final Location limit) { //getNextLocation() can throw an IOException, we should handle it and set //nextLocation to null and abort gracefully //Should not happen in the normal case Location location = null; try { - location = journal.getNextLocation(nextLocation); + location = journal.getNextLocation(nextLocation, limit); } catch (IOException e) { LOG.warn("Failed to load next journal location after: {}, reason: {}", nextLocation, e); if (LOG.isDebugEnabled()) { diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java index bee6cafef3..cd8a84b9ea 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.store.kahadb.disk.journal; +import java.io.EOFException; import java.io.File; import java.io.FileNotFoundException; import java.io.FilenameFilter; @@ -821,6 +822,10 @@ public class Journal { } public Location getNextLocation(Location location) throws IOException, IllegalStateException { + return getNextLocation(location, null); + } + + public Location getNextLocation(Location location, Location limit) throws IOException, IllegalStateException { Location cur = null; while (true) { if (cur == null) { @@ -860,6 +865,10 @@ public class Journal { } else { cur.setDataFileId(dataFile.getDataFileId().intValue()); cur.setOffset(0); + if (limit != null && cur.compareTo(limit) >= 0) { + LOG.trace("reached limit: {} at: {}", limit, cur); + return null; + } } } @@ -867,6 +876,9 @@ public class Journal { DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); try { reader.readLocationDetails(cur); + } catch (EOFException eof) { + LOG.trace("EOF on next: " + location + ", cur: " + cur); + throw eof; } finally { accessorPool.closeDataFileAccessor(reader); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6432Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6432Test.java new file mode 100644 index 0000000000..de9dd63b78 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6432Test.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.MessageDatabase; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.activemq.util.Wait; +import org.apache.log4j.Appender; +import org.apache.log4j.Level; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.ObjectName; +import java.io.File; +import java.io.FilenameFilter; +import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class AMQ6432Test { + private static final Logger LOG = LoggerFactory.getLogger(AMQ6432Test.class); + + private static final String QUEUE_NAME = "test.queue"; + private BrokerService broker; + + @Before + public void setup() throws Exception { + + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(true); + + KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter(); + kahaDB.setJournalMaxFileLength(256 * 1024); + kahaDB.setCleanupInterval(500); + kahaDB.setCompactAcksAfterNoGC(1); + kahaDB.setCompactAcksIgnoresStoreGrowth(true); + broker.setPersistenceAdapter(kahaDB); + + + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + } + + @Test + public void testTransactedStoreUsageSuspendResume() throws Exception { + + org.apache.log4j.Logger log4jLogger = + org.apache.log4j.Logger.getLogger(MessageDatabase.class); + final AtomicBoolean failed = new AtomicBoolean(false); + + final File journalDataDir = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getDirectory(); + + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getLevel().equals(Level.WARN) && event.getMessage().toString().startsWith("Failed to load next journal")) { + LOG.info("received unexpected log message: " + event.getMessage()); + failed.set(true); + } + } + }; + log4jLogger.addAppender(appender); + try { + + ExecutorService sendExecutor = Executors.newSingleThreadExecutor(); + sendExecutor.execute(new Runnable() { + @Override + public void run() { + try { + sendReceive(10000); + } catch (Exception ignored) { + } + } + }); + + sendExecutor.execute(new Runnable() { + @Override + public void run() { + try { + sendLargeAndPurge(5000); + } catch (Exception ignored) { + } + } + }); + + sendExecutor.shutdown(); + sendExecutor.awaitTermination(10, TimeUnit.MINUTES); + + + // need to let a few gc cycles to complete then there will be 2 files in the mix and acks will move + TimeUnit.SECONDS.sleep(2); + + assertTrue("gc worked ok", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().size() < 3; + } + })); + + } finally { + log4jLogger.removeAppender(appender); + } + assertFalse("failed on unexpected log event", failed.get()); + + sendReceive(500); + + assertTrue("gc worked ok", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().size() < 2; + } + })); + + // file actually gone! + LOG.info("Files: " + Arrays.asList(journalDataDir.listFiles())); + assertTrue("Minimum data files in the mix", journalDataDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("db-"); + } + }).length == 1); + + + } + + private void sendReceive(int max) throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + factory.setAlwaysSyncSend(true); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination queue = session.createQueue(QUEUE_NAME+max); + MessageProducer producer = session.createProducer(null); + + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[10]); + + MessageConsumer consumer = session.createConsumer(queue); + + for (int i=0; i