diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java index d0465f3dd6..dfcf51acd3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java @@ -434,6 +434,9 @@ public class AsyncDataManager { purgeList.add(dataFile); } } + if (LOG.isDebugEnabled()) { + LOG.debug("lastFileId=" + lastFile + ", purgeList: (" + purgeList.size() + ") " + purgeList); + } for (DataFile dataFile : purgeList) { forceRemoveDataFile(dataFile); } @@ -465,17 +468,17 @@ public class AsyncDataManager { throws IOException { accessorPool.disposeDataFileAccessors(dataFile); fileByFileMap.remove(dataFile.getFile()); - DataFile removed = fileMap.remove(dataFile.getDataFileId()); storeSize.addAndGet(-dataFile.getLength()); dataFile.unlink(); if (archiveDataLogs) { dataFile.move(getDirectoryArchive()); - LOG.info("moved data file " + dataFile + " to " + LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive()); } else { boolean result = dataFile.delete(); - LOG.info("discarding data file " + dataFile - + (result ? "successful " : "failed")); + if (!result) { + LOG.info("Failed to discard data file " + dataFile); + } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java index f46a8e81b8..9fc6fb7e0a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java @@ -261,6 +261,9 @@ public class AMQMessageStore implements MessageStore { data = messages.remove(id); if (data == null) { messageAcks.add(ack); + } else { + // message never got written so datafileReference will still exist + AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, data.getFileId()); } }finally { lock.unlock(); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java index 5a6a270809..c8d47090e4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java @@ -26,9 +26,9 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.activeio.journal.Journal; @@ -122,7 +122,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY; private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR; private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH; - private Map> dataFilesInProgress = new ConcurrentHashMap> (); + private Map> dataFilesInProgress = new ConcurrentHashMap> (); private String directoryPath = ""; private RandomAccessFile lockFile; private FileLock lock; @@ -271,14 +271,14 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, checkpoint(false); } }; - Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval); + Scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval()); periodicCleanupTask = new Runnable() { public void run() { cleanup(); } }; - Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval); + Scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval()); if (lockAquired && lockLogged) { LOG.info("Aquired lock for AMQ Store" + getDirectory()); @@ -426,8 +426,11 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, public void cleanup() { try { SetinProgress = new HashSet(); - for (Set set: dataFilesInProgress.values()) { - inProgress.addAll(set); + if (LOG.isDebugEnabled()) { + LOG.debug("dataFilesInProgress.values: (" + dataFilesInProgress.values().size() + ") " + dataFilesInProgress.values()); + } + for (Map set: dataFilesInProgress.values()) { + inProgress.addAll(set.keySet()); } Integer lastDataFile = asyncDataManager.getCurrentDataFileId(); inProgress.add(lastDataFile); @@ -437,6 +440,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, if (lastActiveTx != null) { lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId()); } + LOG.debug("lastDataFile: " + lastDataFile); asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1); } catch (IOException e) { LOG.error("Could not cleanup data files: " + e, e); @@ -967,18 +971,32 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) { - Setset = dataFilesInProgress.get(store); - if (set == null) { - set = new CopyOnWriteArraySet(); - dataFilesInProgress.put(store, set); + Map map = dataFilesInProgress.get(store); + if (map == null) { + map = new ConcurrentHashMap(); + dataFilesInProgress.put(store, map); } - set.add(dataFileId); + AtomicInteger count = map.get(dataFileId); + if (count == null) { + count = new AtomicInteger(0); + map.put(dataFileId, count); + } + count.incrementAndGet(); } protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) { - Setset = dataFilesInProgress.get(store); - if (set != null) { - set.remove(dataFileId); + Map map = dataFilesInProgress.get(store); + if (map != null) { + AtomicInteger count = map.get(dataFileId); + if (count != null) { + int newCount = count.decrementAndGet(); + if (newCount <=0) { + map.remove(dataFileId); + } + } + if (map.isEmpty()) { + dataFilesInProgress.remove(store); + } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java index 259d4370d4..735f572c85 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java @@ -72,6 +72,7 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory { result.setUseNio(isUseNio()); result.setMaxFileLength(getMaxFileLength()); result.setCleanupInterval(getCleanupInterval()); + result.setCheckpointInterval(getCheckpointInterval()); result.setIndexBinSize(getIndexBinSize()); result.setIndexKeySize(getIndexKeySize()); result.setIndexPageSize(getIndexPageSize()); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java new file mode 100644 index 0000000000..9c4dcbf2f7 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.concurrent.CountDownLatch; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.kaha.impl.async.AsyncDataManager; +import org.apache.activemq.store.amq.AMQPersistenceAdapter; +import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/* + * see https://issues.apache.org/activemq/browse/AMQ-1926 + */ +public class DataFileNotDeletedTest extends TestCase { + + private static final Log LOG = LogFactory.getLog(DataFileNotDeletedTest.class); + + private final CountDownLatch latch = new CountDownLatch(max_messages); + private static int max_messages = 600; + private static int messageCounter; + private String destinationName = getName()+"_Queue"; + private BrokerService broker; + private Connection receiverConnection; + private Connection producerConnection; + final boolean useTopic = false; + + AMQPersistenceAdapter persistentAdapter; + protected static final String payload = new String(new byte[512]); + + public void setUp() throws Exception { + messageCounter = 0; + startBroker(); + receiverConnection = createConnection(); + receiverConnection.start(); + producerConnection = createConnection(); + producerConnection.start(); + } + + public void tearDown() throws Exception { + receiverConnection.close(); + producerConnection.close(); + broker.stop(); + } + + public void testForDataFileNotDeleted() throws Exception { + doTestForDataFileNotDeleted(false); + } + + public void testForDataFileNotDeletedTransacted() throws Exception { + doTestForDataFileNotDeleted(true); + } + + private void doTestForDataFileNotDeleted(boolean transacted) throws Exception { + + Receiver receiver = new Receiver() { + public void receive(String s) throws Exception { + messageCounter++; + latch.countDown(); + } + }; + buildReceiver(receiverConnection, destinationName, transacted, receiver, useTopic); + + final MessageSender producer = new MessageSender(destinationName, producerConnection, transacted, useTopic); + for (int i=0; i< max_messages; i++) { + producer.send(payload ); + } + latch.await(); + assertEquals(max_messages, messageCounter); + waitFordataFilesToBeCleanedUp(persistentAdapter.getAsyncDataManager(), 30000, 2); + } + + private void waitFordataFilesToBeCleanedUp( + AsyncDataManager asyncDataManager, int timeout, int numExpected) throws InterruptedException { + long expiry = System.currentTimeMillis() + timeout; + while(expiry > System.currentTimeMillis()) { + if (asyncDataManager.getFiles().size() <= numExpected) { + break; + } else { + Thread.sleep(1000); + } + } + assertEquals("persistence adapter dataManager has correct number of files", 2, asyncDataManager.getFiles().size()); + } + + private Connection createConnection() throws JMSException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + return factory.createConnection(); + } + + private void startBroker() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.addConnector("tcp://localhost:61616").setName("Default"); + + AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) broker.getPersistenceFactory(); + // ensure there are a bunch of data files but multiple entries in each + factory.setMaxFileLength(1024 * 20); + // speed up the test case, checkpoint an cleanup early and often + factory.setCheckpointInterval(500); + factory.setCleanupInterval(500); + factory.setSyncOnWrite(false); + + persistentAdapter = (AMQPersistenceAdapter) broker.getPersistenceAdapter(); + broker.start(); + LOG.info("Starting broker.."); + } + + private void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver, boolean isTopic) throws Exception { + final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer inputMessageConsumer = session.createConsumer(isTopic ? session.createTopic(queueName) : session.createQueue(queueName)); + MessageListener messageListener = new MessageListener() { + + public void onMessage(Message message) { + try { + ObjectMessage objectMessage = (ObjectMessage)message; + String s = (String)objectMessage.getObject(); + receiver.receive(s); + if (session.getTransacted()) { + session.commit(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + inputMessageConsumer.setMessageListener(messageListener); + } +}