From d85895666a17b5e059548f20f787b0e507ae964b Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 16 Jul 2010 17:30:25 +0000 Subject: [PATCH] resolve https://issues.apache.org/activemq/browse/AMQ-2832 - impl for AMQ pa git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@964866 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/kaha/MessageAckWithLocation.java | 29 +++++++++++++ .../apache/activemq/store/MessageStore.java | 2 - .../activemq/store/amq/AMQMessageStore.java | 13 +++--- .../store/amq/AMQPersistenceAdapter.java | 3 +- .../store/kahadaptor/KahaReferenceStore.java | 17 +++++--- .../kahadaptor/KahaReferenceStoreAdapter.java | 41 ++++++++++++++++++- ...ckReplayAfterStoreCleanupAMQStoreTest.java | 39 ++++++++++++++++++ 7 files changed, 128 insertions(+), 16 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java b/activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java new file mode 100644 index 0000000000..5d8d73b245 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha; + +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.kaha.impl.async.Location; + +public final class MessageAckWithLocation extends MessageAck { + public final Location location; + + public MessageAckWithLocation(MessageAck ack, Location location) { + ack.copy(this); + this.location = location; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java index a4bcd1a04b..3fbbd1f180 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java @@ -47,7 +47,6 @@ public interface MessageStore extends Service { * * @param context context * @param message - * @param l * @return a Future to track when this is complete * @throws IOException * @throws IOException @@ -59,7 +58,6 @@ public interface MessageStore extends Service { * * @param context context * @param message - * @param l * @return a Future to track when this is complete * @throws IOException * @throws IOException diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java index 7b0c0b3128..cce6ffdc0e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java @@ -38,6 +38,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; +import org.apache.activemq.kaha.MessageAckWithLocation; import org.apache.activemq.kaha.impl.async.Location; import org.apache.activemq.store.AbstractMessageStore; import org.apache.activemq.store.MessageRecoveryListener; @@ -70,7 +71,7 @@ public class AMQMessageStore extends AbstractMessageStore { protected final TaskRunner asyncWriteTask; protected CountDownLatch flushLatch; private Map messages = new LinkedHashMap(); - private List messageAcks = new ArrayList(); + private List messageAcks = new ArrayList(); /** A MessageStore that we can use to retrieve messages quickly. */ private Map cpAddedMessageIds; private final boolean debug = LOG.isDebugEnabled(); @@ -255,7 +256,7 @@ public class AMQMessageStore extends AbstractMessageStore { MessageId id = ack.getLastMessageId(); data = messages.remove(id); if (data == null) { - messageAcks.add(ack); + messageAcks.add(new MessageAckWithLocation(ack, location)); } else { // message never got written so datafileReference will still exist AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, data.getFileId()); @@ -350,7 +351,7 @@ public class AMQMessageStore extends AbstractMessageStore { * @throws IOException */ protected Location doAsyncWrite() throws IOException { - final List cpRemovedMessageLocations; + final List cpRemovedMessageLocations; final List cpActiveJournalLocations; final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize(); final Location lastLocation; @@ -361,7 +362,7 @@ public class AMQMessageStore extends AbstractMessageStore { cpRemovedMessageLocations = this.messageAcks; cpActiveJournalLocations = new ArrayList(inFlightTxLocations); this.messages = new LinkedHashMap(); - this.messageAcks = new ArrayList(); + this.messageAcks = new ArrayList(); lastLocation = this.lastLocation; } finally { lock.unlock(); @@ -406,7 +407,7 @@ public class AMQMessageStore extends AbstractMessageStore { persitanceAdapter.commitTransaction(context); persitanceAdapter.beginTransaction(context); // Checkpoint the removed messages. - for (MessageAck ack : cpRemovedMessageLocations) { + for (MessageAckWithLocation ack : cpRemovedMessageLocations) { try { referenceStore.removeMessage(transactionTemplate.getContext(), ack); } catch (Throwable e) { @@ -576,5 +577,5 @@ public class AMQMessageStore extends AbstractMessageStore { } getReferenceStore().setBatch(messageId); } - + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java index 640c78f81f..9dd43226b5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileLock; import java.util.Date; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; @@ -570,7 +571,6 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, * * @throws IOException * @throws IOException - * @throws InvalidLocationException * @throws IllegalStateException */ private void recover() throws IllegalStateException, IOException { @@ -1051,7 +1051,6 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, } - protected void lock() throws Exception { lockLogged = false; lockAquired = false; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java index 4a4a494975..82c4a9c8ab 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java @@ -29,6 +29,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.kaha.MapContainer; +import org.apache.activemq.kaha.MessageAckWithLocation; import org.apache.activemq.kaha.StoreEntry; import org.apache.activemq.store.AbstractMessageStore; import org.apache.activemq.store.MessageRecoveryListener; @@ -203,17 +204,17 @@ public class KahaReferenceStore extends AbstractMessageStore implements Referenc } public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { - removeMessage(ack.getLastMessageId()); - } - - public void removeMessage(MessageId msgId) throws IOException { lock.lock(); try { + MessageId msgId = ack.getLastMessageId(); StoreEntry entry = messageContainer.getEntry(msgId); if (entry != null) { ReferenceRecord rr = messageContainer.remove(msgId); if (rr != null) { removeInterest(rr); + if (ack instanceof MessageAckWithLocation) { + recordAckFileReferences((MessageAckWithLocation)ack, rr.getData().getFileId()); + } dispatchAudit.isDuplicate(msgId); if (LOG.isDebugEnabled()) { LOG.debug(destination.getPhysicalName() + " remove reference: " + msgId); @@ -230,12 +231,18 @@ public class KahaReferenceStore extends AbstractMessageStore implements Referenc } } + private void recordAckFileReferences(MessageAckWithLocation ack, int messageFileId) { + adapter.recordAckFileReferences(ack.location.getDataFileId(), messageFileId); + } + public void removeAllMessages(ConnectionContext context) throws IOException { lock.lock(); try { Set tmpSet = new HashSet(messageContainer.keySet()); + MessageAck ack = new MessageAck(); for (MessageId id:tmpSet) { - removeMessage(id); + ack.setLastMessageId(id); + removeMessage(null, ack); } resetBatching(); messageContainer.clear(); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java index 303ac43415..5437ef81c9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java @@ -252,7 +252,44 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse() */ public synchronized Set getReferenceFileIdsInUse() throws IOException { - return new HashSet(recordReferences.keySet()); + Set inUse = new HashSet(recordReferences.keySet()); + + Iterator>> ackReferences = ackMessageFileMap.entrySet().iterator(); + while (ackReferences.hasNext()) { + Map.Entry> ackReference = ackReferences.next(); + if (!inUse.contains(ackReference.getKey())) { + // should we keep this data file + for (Integer referencedFileId : ackReference.getValue()) { + if (inUse.contains(referencedFileId)) { + // keep this ack file + inUse.add(ackReference.getKey()); + LOG.debug("not removing data file: " + ackReference.getKey() + + " as contained ack(s) refer to referencedFileId file: " + ackReference.getValue()); + break; + } + } + } + if (!inUse.contains(ackReference.getKey())) { + ackReferences.remove(); + } + } + + return inUse; + } + + Map> ackMessageFileMap = new HashMap>(); + public synchronized void recordAckFileReferences(int ackDataFileId, int messageFileId) { + Set referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackDataFileId)); + if (referenceFileIds == null) { + referenceFileIds = new HashSet(); + referenceFileIds.add(Integer.valueOf(messageFileId)); + ackMessageFileMap.put(Integer.valueOf(ackDataFileId), referenceFileIds); + } else { + Integer id = Integer.valueOf(messageFileId); + if (!referenceFileIds.contains(id)) { + referenceFileIds.add(id); + } + } } /** @@ -409,4 +446,6 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements public void setIndexLoadFactor(int loadFactor) { this.indexLoadFactor = loadFactor; } + + } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java new file mode 100644 index 0000000000..103a67357b --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory; + + +public class SparseAckReplayAfterStoreCleanupAMQStoreTest extends AMQ2832Test { + @Override + protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception { + brokerService.setPersistenceFactory(new AMQPersistenceAdapterFactory()); + AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) brokerService.getPersistenceFactory(); + // ensure there are a bunch of data files but multiple entries in each + factory.setMaxFileLength(1024 * 12); + // speed up the test case, checkpoint an cleanup early and often + factory.setCheckpointInterval(500); + factory.setCleanupInterval(500); + factory.setSyncOnWrite(false); + if (!deleteAllOnStart) { + factory.setForceRecoverReferenceStore(true); + } + brokerService.getPersistenceAdapter(); + } +}