From 172200174819bc6ec54e9a245e2280dfdd730687 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Mon, 15 Oct 2007 19:31:22 +0000 Subject: [PATCH] fix data logs not being correctly removed git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@584863 13f79535-47bb-0310-9956-ffa450edef68 --- .../kaha/impl/async/AsyncDataManager.java | 33 ++++++++----- .../activemq/kaha/impl/async/DataFile.java | 4 ++ .../kaha/impl/async/DataFileAccessorPool.java | 11 ++--- .../activemq/store/TopicMessageStore.java | 3 +- .../activemq/store/TopicReferenceStore.java | 7 ++- .../activemq/store/amq/AMQMessageStore.java | 33 ++++++++----- .../store/amq/AMQPersistenceAdapter.java | 2 +- .../store/amq/AMQTopicMessageStore.java | 43 +++++----------- .../store/kahadaptor/KahaReferenceStore.java | 3 +- .../kahadaptor/KahaTopicReferenceStore.java | 49 +++++++++++++++++-- .../store/kahadaptor/TopicSubContainer.java | 16 ++++++ 11 files changed, 133 insertions(+), 71 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java index b8895a9e74..97e86a28be 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java @@ -270,6 +270,12 @@ public final class AsyncDataManager { storeSize.addAndGet(size); return currentWriteFile; } + + public synchronized void removeLocation(Location location) throws IOException{ + + DataFile dataFile = getDataFile(location); + dataFile.decrement(); + } DataFile getDataFile(Location item) throws IOException { Integer key = Integer.valueOf(item.getDataFileId()); @@ -346,6 +352,7 @@ public final class AsyncDataManager { synchronized void addInterestInFile(DataFile dataFile) { if (dataFile != null) { dataFile.increment(); + System.err.println("ADD INTEREST: " + dataFile); } } @@ -355,6 +362,7 @@ public final class AsyncDataManager { DataFile dataFile = (DataFile)fileMap.get(key); removeInterestInFile(dataFile); } + } synchronized void removeInterestInFile(DataFile dataFile) throws IOException { @@ -362,24 +370,20 @@ public final class AsyncDataManager { if (dataFile.decrement() <= 0) { removeDataFile(dataFile); } + System.err.println("REMOVE INTEREST: " + dataFile); } } public synchronized void consolidateDataFilesNotIn(Set inUse) throws IOException { - - // Substract and the difference is the set of files that are no longer - // needed :) Set unUsed = new HashSet(fileMap.keySet()); unUsed.removeAll(inUse); - List purgeList = new ArrayList(); for (Integer key : unUsed) { DataFile dataFile = (DataFile)fileMap.get(key); purgeList.add(dataFile); } - for (DataFile dataFile : purgeList) { - removeDataFile(dataFile); + forceRemoveDataFile(dataFile); } } @@ -399,16 +403,20 @@ public final class AsyncDataManager { // Make sure we don't delete too much data. if (dataFile == currentWriteFile || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) { - return; + LOG.debug("Won't remove DataFile" + dataFile); + return; } - + forceRemoveDataFile(dataFile); + } + + private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException { accessorPool.disposeDataFileAccessors(dataFile); - - fileMap.remove(dataFile.getDataFileId()); + DataFile removed = fileMap.remove(dataFile.getDataFileId()); storeSize.addAndGet(-dataFile.getLength()); dataFile.unlink(); boolean result = dataFile.delete(); - LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed")); + LOG.debug("discarding data file " + dataFile + + (result ? "successful " : "failed")); } @@ -519,7 +527,8 @@ public final class AsyncDataManager { } public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { - return appender.storeItem(data, Location.USER_TYPE, sync); + Location loc = appender.storeItem(data, Location.USER_TYPE, sync); + return loc; } public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException { diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java index 621d7fc318..f47bb82d5b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java @@ -66,6 +66,10 @@ class DataFile extends LinkedNode implements Comparable { public synchronized int decrement() { return --referenceCount; } + + public synchronized int getReferenceCount(){ + return referenceCount; + } public synchronized boolean isUnused() { return referenceCount <= 0; diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java index ee7e80f912..b93bc27970 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java @@ -59,9 +59,8 @@ public class DataFileAccessorPool { return rc; } - public void closeDataFileReader(DataFileAccessor reader) { + public synchronized void closeDataFileReader(DataFileAccessor reader) { openCounter--; - used = true; if (pool.size() >= maxOpenReadersPerFile || disposed) { reader.dispose(); } else { @@ -69,15 +68,15 @@ public class DataFileAccessorPool { } } - public void clearUsedMark() { + public synchronized void clearUsedMark() { used = false; } - public boolean isUsed() { + public synchronized boolean isUsed() { return used; } - public void dispose() { + public synchronized void dispose() { for (DataFileAccessor reader : pool) { reader.dispose(); } @@ -85,7 +84,7 @@ public class DataFileAccessorPool { disposed = true; } - public int getOpenCounter() { + public synchronized int getOpenCounter() { return openCounter; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java index 3a101dcabc..44838f985b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java @@ -17,9 +17,10 @@ package org.apache.activemq.store; import java.io.IOException; + import javax.jms.JMSException; + import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java index 1e382e7aa7..4e9b67f69f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java @@ -31,17 +31,20 @@ import org.apache.activemq.command.SubscriptionInfo; */ public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore { /** - * Stores the last acknowledged messgeID for the given subscription so that + * Removes the last acknowledged messgeID for the given subscription so that * we can recover and commence dispatching messages from the last checkpoint + * N.B. - all messages previous to this one for a given subscriber + * should also be acknowledged * * @param context * @param clientId * @param subscriptionName * @param messageId * @param subscriptionPersistentId + * @return true if there are no more references to the message - or the message is null * @throws IOException */ - void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException; + boolean acknowledgeReference(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException; /** * @param clientId diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java index a7ea428ebe..d69bbb2688 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java @@ -180,7 +180,7 @@ public class AMQMessageStore implements MessageStore { /** */ - public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { + public void removeMessage(final ConnectionContext context, final MessageAck ack) throws IOException { JournalQueueAck remove = new JournalQueueAck(); remove.setDestination(destination); remove.setMessageAck(ack); @@ -189,7 +189,7 @@ public class AMQMessageStore implements MessageStore { if (debug) { LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location); } - removeMessage(ack, location); + removeMessage(ack,location); } else { if (debug) { LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location); @@ -206,7 +206,7 @@ public class AMQMessageStore implements MessageStore { } synchronized (AMQMessageStore.this) { inFlightTxLocations.remove(location); - removeMessage(ack, location); + removeMessage(ack,location); } } @@ -240,7 +240,7 @@ public class AMQMessageStore implements MessageStore { } } } - + public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) { try { // Only remove the message if it has not already been removed. @@ -378,16 +378,28 @@ public class AMQMessageStore implements MessageStore { * */ public Message getMessage(MessageId identity) throws IOException { + Location location = getLocation(identity); + DataStructure rc = peristenceAdapter.readCommand(location); + try { + return (Message) rc; + } catch (ClassCastException e) { + throw new IOException("Could not read message " + identity + + " at location " + location + + ", expected a message, but got: " + rc); + } + } + + protected Location getLocation(MessageId messageId) throws IOException { ReferenceData data = null; synchronized (this) { // Is it still in flight??? - data = messages.get(identity); + data = messages.get(messageId); if (data == null && cpAddedMessageIds != null) { - data = cpAddedMessageIds.get(identity); + data = cpAddedMessageIds.get(messageId); } } if (data == null) { - data = referenceStore.getMessageReference(identity); + data = referenceStore.getMessageReference(messageId); if (data == null) { return null; } @@ -395,12 +407,7 @@ public class AMQMessageStore implements MessageStore { Location location = new Location(); location.setDataFileId(data.getFileId()); location.setOffset(data.getOffset()); - DataStructure rc = peristenceAdapter.readCommand(location); - try { - return (Message)rc; - } catch (ClassCastException e) { - throw new IOException("Could not read message " + identity + " at location " + location + ", expected a message, but got: " + rc); - } + return location; } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java index cbf779ce54..b0e3105ea2 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java @@ -83,7 +83,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, private TaskRunnerFactory taskRunnerFactory; private WireFormat wireFormat = new OpenWireFormat(); private SystemUsage usageManager; - private long cleanupInterval = 1000 * 60; + private long cleanupInterval = 1000 * 15; private long checkpointInterval = 1000 * 10; private int maxCheckpointWorkers = 1; private int maxCheckpointMessageAddSize = 1024 * 4; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java index 62d27c2bfa..b14785aeb2 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.JournalTopicAck; +import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.kaha.impl.async.Location; @@ -77,7 +78,7 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag /** */ - public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException { + public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName, final MessageId messageId) throws IOException { final boolean debug = LOG.isDebugEnabled(); JournalTopicAck ack = new JournalTopicAck(); ack.setDestination(destination); @@ -92,7 +93,7 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag if (debug) { LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location); } - acknowledge(messageId, location, key); + acknowledge(context,messageId, location, clientId,subscriptionName); } else { if (debug) { LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location); @@ -109,7 +110,7 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag } synchronized (AMQTopicMessageStore.this) { inFlightTxLocations.remove(location); - acknowledge(messageId, location, key); + acknowledge(context,messageId, location, clientId,subscriptionName); } } @@ -142,12 +143,16 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag * @param messageId * @param location * @param key - * @throws InterruptedIOException + * @throws IOException */ - protected void acknowledge(MessageId messageId, Location location, SubscriptionKey key) throws InterruptedIOException { + protected void acknowledge(ConnectionContext context,MessageId messageId, Location location, String clientId,String subscriptionName) throws IOException { synchronized (this) { lastLocation = location; - ackedLastAckLocations.put(key, messageId); + if (topicReferenceStore.acknowledgeReference(context, clientId, subscriptionName, messageId)){ + MessageAck ack = new MessageAck(); + ack.setLastMessageId(messageId); + removeMessage(context, ack); + } } try { asyncWriteTask.wakeup(); @@ -156,32 +161,6 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag } } - @Override - protected Location doAsyncWrite() throws IOException { - final Map cpAckedLastAckLocations; - // swap out the hash maps.. - synchronized (this) { - cpAckedLastAckLocations = this.ackedLastAckLocations; - this.ackedLastAckLocations = new HashMap(); - } - Location location = super.doAsyncWrite(); - - if (cpAckedLastAckLocations != null) { - transactionTemplate.run(new Callback() { - public void execute() throws Exception { - // Checkpoint the acknowledged messages. - Iterator iterator = cpAckedLastAckLocations.keySet().iterator(); - while (iterator.hasNext()) { - SubscriptionKey subscriptionKey = iterator.next(); - MessageId identity = cpAckedLastAckLocations.get(subscriptionKey); - topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity); - } - } - }); - } - return location; - } - /** * @return Returns the longTermStore. */ diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java index aaa07e0244..9fddf34a8e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java @@ -26,6 +26,7 @@ import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.StoreEntry; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.ReferenceStore; +import org.apache.activemq.store.ReferenceStore.ReferenceData; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; @@ -120,7 +121,7 @@ public class KahaReferenceStore implements ReferenceStore { } return result.getData(); } - + public void addReferenceFileIdsInUse() { for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer .getNext(entry)) { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java index 71b9b7084d..f179c130a1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java @@ -117,12 +117,55 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic return container; } - public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, - MessageId messageId) throws IOException { + public synchronized boolean acknowledgeReference(ConnectionContext context, + String clientId, String subscriptionName, MessageId messageId) + throws IOException { + boolean removeMessage = false; String key = getSubscriptionKey(clientId, subscriptionName); TopicSubContainer container = subscriberMessages.get(key); if (container != null) { + ConsumerMessageRef ref = null; + if((ref = container.remove(messageId)) != null) { + TopicSubAck tsa = ackContainer.get(ref.getAckEntry()); + if (tsa != null) { + if (tsa.decrementCount() <= 0) { + StoreEntry entry = ref.getAckEntry(); + entry = ackContainer.refresh(entry); + ackContainer.remove(entry); + ReferenceRecord rr = messageContainer.get(messageId); + if (rr != null) { + entry = tsa.getMessageEntry(); + entry = messageContainer.refresh(entry); + messageContainer.remove(entry); + removeInterest(rr); + removeMessage = true; + }else { + System.err.println("REF RTEC OS NULL!!!"); + } + } else { + System.out.println("RED XOUVT IAS " + tsa.getCount()); + ackContainer.update(ref.getAckEntry(), tsa); + } + }else{ + System.err.println("NO TAS!!!"); + } + }else{ + //no message held + removeMessage = true; + } + } + return removeMessage; + + } + + public synchronized void acknowledge(ConnectionContext context, + String clientId, String subscriptionName, MessageId messageId) + throws IOException { + String key = getSubscriptionKey(clientId, subscriptionName); + + TopicSubContainer container = subscriberMessages.get(key); + if (container != null) { ConsumerMessageRef ref = container.remove(messageId); if (ref != null) { TopicSubAck tsa = ackContainer.get(ref.getAckEntry()); @@ -145,7 +188,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic } } } - } + } public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException { String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName()); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java index dc394479a0..8497fea752 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java @@ -81,6 +81,22 @@ public class TopicSubContainer { } return result; } + + public ConsumerMessageRef removeFirst() { + ConsumerMessageRef result = null; + if (!listContainer.isEmpty()) { + StoreEntry entry = listContainer.getFirst(); + + result = (ConsumerMessageRef) listContainer.get(entry); + listContainer.remove(entry); + if (listContainer != null && batchEntry != null + && (listContainer.isEmpty() || batchEntry.equals(entry))) { + reset(); + } + + } + return result; + } public ConsumerMessageRef get(StoreEntry entry) { return (ConsumerMessageRef)listContainer.get(entry);