fix data logs not being correctly removed

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@584863 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-10-15 19:31:22 +00:00
parent c5f251ceab
commit 1722001748
11 changed files with 133 additions and 71 deletions

View File

@ -270,6 +270,12 @@ public final class AsyncDataManager {
storeSize.addAndGet(size); storeSize.addAndGet(size);
return currentWriteFile; return currentWriteFile;
} }
public synchronized void removeLocation(Location location) throws IOException{
DataFile dataFile = getDataFile(location);
dataFile.decrement();
}
DataFile getDataFile(Location item) throws IOException { DataFile getDataFile(Location item) throws IOException {
Integer key = Integer.valueOf(item.getDataFileId()); Integer key = Integer.valueOf(item.getDataFileId());
@ -346,6 +352,7 @@ public final class AsyncDataManager {
synchronized void addInterestInFile(DataFile dataFile) { synchronized void addInterestInFile(DataFile dataFile) {
if (dataFile != null) { if (dataFile != null) {
dataFile.increment(); dataFile.increment();
System.err.println("ADD INTEREST: " + dataFile);
} }
} }
@ -355,6 +362,7 @@ public final class AsyncDataManager {
DataFile dataFile = (DataFile)fileMap.get(key); DataFile dataFile = (DataFile)fileMap.get(key);
removeInterestInFile(dataFile); removeInterestInFile(dataFile);
} }
} }
synchronized void removeInterestInFile(DataFile dataFile) throws IOException { synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
@ -362,24 +370,20 @@ public final class AsyncDataManager {
if (dataFile.decrement() <= 0) { if (dataFile.decrement() <= 0) {
removeDataFile(dataFile); removeDataFile(dataFile);
} }
System.err.println("REMOVE INTEREST: " + dataFile);
} }
} }
public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse) throws IOException { public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse) throws IOException {
// Substract and the difference is the set of files that are no longer
// needed :)
Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet()); Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
unUsed.removeAll(inUse); unUsed.removeAll(inUse);
List<DataFile> purgeList = new ArrayList<DataFile>(); List<DataFile> purgeList = new ArrayList<DataFile>();
for (Integer key : unUsed) { for (Integer key : unUsed) {
DataFile dataFile = (DataFile)fileMap.get(key); DataFile dataFile = (DataFile)fileMap.get(key);
purgeList.add(dataFile); purgeList.add(dataFile);
} }
for (DataFile dataFile : purgeList) { for (DataFile dataFile : purgeList) {
removeDataFile(dataFile); forceRemoveDataFile(dataFile);
} }
} }
@ -399,16 +403,20 @@ public final class AsyncDataManager {
// Make sure we don't delete too much data. // Make sure we don't delete too much data.
if (dataFile == currentWriteFile || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) { if (dataFile == currentWriteFile || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) {
return; LOG.debug("Won't remove DataFile" + dataFile);
return;
} }
forceRemoveDataFile(dataFile);
}
private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
accessorPool.disposeDataFileAccessors(dataFile); accessorPool.disposeDataFileAccessors(dataFile);
DataFile removed = fileMap.remove(dataFile.getDataFileId());
fileMap.remove(dataFile.getDataFileId());
storeSize.addAndGet(-dataFile.getLength()); storeSize.addAndGet(-dataFile.getLength());
dataFile.unlink(); dataFile.unlink();
boolean result = dataFile.delete(); boolean result = dataFile.delete();
LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed")); LOG.debug("discarding data file " + dataFile
+ (result ? "successful " : "failed"));
} }
@ -519,7 +527,8 @@ public final class AsyncDataManager {
} }
public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
return appender.storeItem(data, Location.USER_TYPE, sync); Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
return loc;
} }
public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException { public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {

View File

@ -66,6 +66,10 @@ class DataFile extends LinkedNode implements Comparable<DataFile> {
public synchronized int decrement() { public synchronized int decrement() {
return --referenceCount; return --referenceCount;
} }
public synchronized int getReferenceCount(){
return referenceCount;
}
public synchronized boolean isUnused() { public synchronized boolean isUnused() {
return referenceCount <= 0; return referenceCount <= 0;

View File

@ -59,9 +59,8 @@ public class DataFileAccessorPool {
return rc; return rc;
} }
public void closeDataFileReader(DataFileAccessor reader) { public synchronized void closeDataFileReader(DataFileAccessor reader) {
openCounter--; openCounter--;
used = true;
if (pool.size() >= maxOpenReadersPerFile || disposed) { if (pool.size() >= maxOpenReadersPerFile || disposed) {
reader.dispose(); reader.dispose();
} else { } else {
@ -69,15 +68,15 @@ public class DataFileAccessorPool {
} }
} }
public void clearUsedMark() { public synchronized void clearUsedMark() {
used = false; used = false;
} }
public boolean isUsed() { public synchronized boolean isUsed() {
return used; return used;
} }
public void dispose() { public synchronized void dispose() {
for (DataFileAccessor reader : pool) { for (DataFileAccessor reader : pool) {
reader.dispose(); reader.dispose();
} }
@ -85,7 +84,7 @@ public class DataFileAccessorPool {
disposed = true; disposed = true;
} }
public int getOpenCounter() { public synchronized int getOpenCounter() {
return openCounter; return openCounter;
} }

View File

@ -17,9 +17,10 @@
package org.apache.activemq.store; package org.apache.activemq.store;
import java.io.IOException; import java.io.IOException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;

View File

@ -31,17 +31,20 @@ import org.apache.activemq.command.SubscriptionInfo;
*/ */
public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore { public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
/** /**
* Stores the last acknowledged messgeID for the given subscription so that * Removes the last acknowledged messgeID for the given subscription so that
* we can recover and commence dispatching messages from the last checkpoint * we can recover and commence dispatching messages from the last checkpoint
* N.B. - all messages previous to this one for a given subscriber
* should also be acknowledged
* *
* @param context * @param context
* @param clientId * @param clientId
* @param subscriptionName * @param subscriptionName
* @param messageId * @param messageId
* @param subscriptionPersistentId * @param subscriptionPersistentId
* @return true if there are no more references to the message - or the message is null
* @throws IOException * @throws IOException
*/ */
void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException; boolean acknowledgeReference(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException;
/** /**
* @param clientId * @param clientId

View File

@ -180,7 +180,7 @@ public class AMQMessageStore implements MessageStore {
/** /**
*/ */
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { public void removeMessage(final ConnectionContext context, final MessageAck ack) throws IOException {
JournalQueueAck remove = new JournalQueueAck(); JournalQueueAck remove = new JournalQueueAck();
remove.setDestination(destination); remove.setDestination(destination);
remove.setMessageAck(ack); remove.setMessageAck(ack);
@ -189,7 +189,7 @@ public class AMQMessageStore implements MessageStore {
if (debug) { if (debug) {
LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location); LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
} }
removeMessage(ack, location); removeMessage(ack,location);
} else { } else {
if (debug) { if (debug) {
LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location); LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
@ -206,7 +206,7 @@ public class AMQMessageStore implements MessageStore {
} }
synchronized (AMQMessageStore.this) { synchronized (AMQMessageStore.this) {
inFlightTxLocations.remove(location); inFlightTxLocations.remove(location);
removeMessage(ack, location); removeMessage(ack,location);
} }
} }
@ -240,7 +240,7 @@ public class AMQMessageStore implements MessageStore {
} }
} }
} }
public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) { public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
try { try {
// Only remove the message if it has not already been removed. // Only remove the message if it has not already been removed.
@ -378,16 +378,28 @@ public class AMQMessageStore implements MessageStore {
* *
*/ */
public Message getMessage(MessageId identity) throws IOException { public Message getMessage(MessageId identity) throws IOException {
Location location = getLocation(identity);
DataStructure rc = peristenceAdapter.readCommand(location);
try {
return (Message) rc;
} catch (ClassCastException e) {
throw new IOException("Could not read message " + identity
+ " at location " + location
+ ", expected a message, but got: " + rc);
}
}
protected Location getLocation(MessageId messageId) throws IOException {
ReferenceData data = null; ReferenceData data = null;
synchronized (this) { synchronized (this) {
// Is it still in flight??? // Is it still in flight???
data = messages.get(identity); data = messages.get(messageId);
if (data == null && cpAddedMessageIds != null) { if (data == null && cpAddedMessageIds != null) {
data = cpAddedMessageIds.get(identity); data = cpAddedMessageIds.get(messageId);
} }
} }
if (data == null) { if (data == null) {
data = referenceStore.getMessageReference(identity); data = referenceStore.getMessageReference(messageId);
if (data == null) { if (data == null) {
return null; return null;
} }
@ -395,12 +407,7 @@ public class AMQMessageStore implements MessageStore {
Location location = new Location(); Location location = new Location();
location.setDataFileId(data.getFileId()); location.setDataFileId(data.getFileId());
location.setOffset(data.getOffset()); location.setOffset(data.getOffset());
DataStructure rc = peristenceAdapter.readCommand(location); return location;
try {
return (Message)rc;
} catch (ClassCastException e) {
throw new IOException("Could not read message " + identity + " at location " + location + ", expected a message, but got: " + rc);
}
} }
/** /**

View File

@ -83,7 +83,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
private TaskRunnerFactory taskRunnerFactory; private TaskRunnerFactory taskRunnerFactory;
private WireFormat wireFormat = new OpenWireFormat(); private WireFormat wireFormat = new OpenWireFormat();
private SystemUsage usageManager; private SystemUsage usageManager;
private long cleanupInterval = 1000 * 60; private long cleanupInterval = 1000 * 15;
private long checkpointInterval = 1000 * 10; private long checkpointInterval = 1000 * 10;
private int maxCheckpointWorkers = 1; private int maxCheckpointWorkers = 1;
private int maxCheckpointMessageAddSize = 1024 * 4; private int maxCheckpointMessageAddSize = 1024 * 4;

View File

@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.JournalTopicAck; import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.impl.async.Location; import org.apache.activemq.kaha.impl.async.Location;
@ -77,7 +78,7 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
/** /**
*/ */
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException { public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName, final MessageId messageId) throws IOException {
final boolean debug = LOG.isDebugEnabled(); final boolean debug = LOG.isDebugEnabled();
JournalTopicAck ack = new JournalTopicAck(); JournalTopicAck ack = new JournalTopicAck();
ack.setDestination(destination); ack.setDestination(destination);
@ -92,7 +93,7 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
if (debug) { if (debug) {
LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location); LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
} }
acknowledge(messageId, location, key); acknowledge(context,messageId, location, clientId,subscriptionName);
} else { } else {
if (debug) { if (debug) {
LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location); LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
@ -109,7 +110,7 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
} }
synchronized (AMQTopicMessageStore.this) { synchronized (AMQTopicMessageStore.this) {
inFlightTxLocations.remove(location); inFlightTxLocations.remove(location);
acknowledge(messageId, location, key); acknowledge(context,messageId, location, clientId,subscriptionName);
} }
} }
@ -142,12 +143,16 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
* @param messageId * @param messageId
* @param location * @param location
* @param key * @param key
* @throws InterruptedIOException * @throws IOException
*/ */
protected void acknowledge(MessageId messageId, Location location, SubscriptionKey key) throws InterruptedIOException { protected void acknowledge(ConnectionContext context,MessageId messageId, Location location, String clientId,String subscriptionName) throws IOException {
synchronized (this) { synchronized (this) {
lastLocation = location; lastLocation = location;
ackedLastAckLocations.put(key, messageId); if (topicReferenceStore.acknowledgeReference(context, clientId, subscriptionName, messageId)){
MessageAck ack = new MessageAck();
ack.setLastMessageId(messageId);
removeMessage(context, ack);
}
} }
try { try {
asyncWriteTask.wakeup(); asyncWriteTask.wakeup();
@ -156,32 +161,6 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
} }
} }
@Override
protected Location doAsyncWrite() throws IOException {
final Map<SubscriptionKey, MessageId> cpAckedLastAckLocations;
// swap out the hash maps..
synchronized (this) {
cpAckedLastAckLocations = this.ackedLastAckLocations;
this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
}
Location location = super.doAsyncWrite();
if (cpAckedLastAckLocations != null) {
transactionTemplate.run(new Callback() {
public void execute() throws Exception {
// Checkpoint the acknowledged messages.
Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator();
while (iterator.hasNext()) {
SubscriptionKey subscriptionKey = iterator.next();
MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity);
}
}
});
}
return location;
}
/** /**
* @return Returns the longTermStore. * @return Returns the longTermStore.
*/ */

View File

@ -26,6 +26,7 @@ import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.StoreEntry; import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.ReferenceStore; import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStore.ReferenceData;
import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
@ -120,7 +121,7 @@ public class KahaReferenceStore implements ReferenceStore {
} }
return result.getData(); return result.getData();
} }
public void addReferenceFileIdsInUse() { public void addReferenceFileIdsInUse() {
for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
.getNext(entry)) { .getNext(entry)) {

View File

@ -117,12 +117,55 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
return container; return container;
} }
public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, public synchronized boolean acknowledgeReference(ConnectionContext context,
MessageId messageId) throws IOException { String clientId, String subscriptionName, MessageId messageId)
throws IOException {
boolean removeMessage = false;
String key = getSubscriptionKey(clientId, subscriptionName); String key = getSubscriptionKey(clientId, subscriptionName);
TopicSubContainer container = subscriberMessages.get(key); TopicSubContainer container = subscriberMessages.get(key);
if (container != null) { if (container != null) {
ConsumerMessageRef ref = null;
if((ref = container.remove(messageId)) != null) {
TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
if (tsa != null) {
if (tsa.decrementCount() <= 0) {
StoreEntry entry = ref.getAckEntry();
entry = ackContainer.refresh(entry);
ackContainer.remove(entry);
ReferenceRecord rr = messageContainer.get(messageId);
if (rr != null) {
entry = tsa.getMessageEntry();
entry = messageContainer.refresh(entry);
messageContainer.remove(entry);
removeInterest(rr);
removeMessage = true;
}else {
System.err.println("REF RTEC OS NULL!!!");
}
} else {
System.out.println("RED XOUVT IAS " + tsa.getCount());
ackContainer.update(ref.getAckEntry(), tsa);
}
}else{
System.err.println("NO TAS!!!");
}
}else{
//no message held
removeMessage = true;
}
}
return removeMessage;
}
public synchronized void acknowledge(ConnectionContext context,
String clientId, String subscriptionName, MessageId messageId)
throws IOException {
String key = getSubscriptionKey(clientId, subscriptionName);
TopicSubContainer container = subscriberMessages.get(key);
if (container != null) {
ConsumerMessageRef ref = container.remove(messageId); ConsumerMessageRef ref = container.remove(messageId);
if (ref != null) { if (ref != null) {
TopicSubAck tsa = ackContainer.get(ref.getAckEntry()); TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
@ -145,7 +188,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
} }
} }
} }
} }
public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException { public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName()); String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName());

View File

@ -81,6 +81,22 @@ public class TopicSubContainer {
} }
return result; return result;
} }
public ConsumerMessageRef removeFirst() {
ConsumerMessageRef result = null;
if (!listContainer.isEmpty()) {
StoreEntry entry = listContainer.getFirst();
result = (ConsumerMessageRef) listContainer.get(entry);
listContainer.remove(entry);
if (listContainer != null && batchEntry != null
&& (listContainer.isEmpty() || batchEntry.equals(entry))) {
reset();
}
}
return result;
}
public ConsumerMessageRef get(StoreEntry entry) { public ConsumerMessageRef get(StoreEntry entry) {
return (ConsumerMessageRef)listContainer.get(entry); return (ConsumerMessageRef)listContainer.get(entry);