git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@964866 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-07-16 17:30:25 +00:00
parent 145c80a298
commit d85895666a
7 changed files with 128 additions and 16 deletions

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.kaha.impl.async.Location;
public final class MessageAckWithLocation extends MessageAck {
public final Location location;
public MessageAckWithLocation(MessageAck ack, Location location) {
ack.copy(this);
this.location = location;
}
}

View File

@ -47,7 +47,6 @@ public interface MessageStore extends Service {
*
* @param context context
* @param message
* @param l
* @return a Future to track when this is complete
* @throws IOException
* @throws IOException
@ -59,7 +58,6 @@ public interface MessageStore extends Service {
*
* @param context context
* @param message
* @param l
* @return a Future to track when this is complete
* @throws IOException
* @throws IOException

View File

@ -38,6 +38,7 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.kaha.MessageAckWithLocation;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageRecoveryListener;
@ -70,7 +71,7 @@ public class AMQMessageStore extends AbstractMessageStore {
protected final TaskRunner asyncWriteTask;
protected CountDownLatch flushLatch;
private Map<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>();
private List<MessageAck> messageAcks = new ArrayList<MessageAck>();
private List<MessageAckWithLocation> messageAcks = new ArrayList<MessageAckWithLocation>();
/** A MessageStore that we can use to retrieve messages quickly. */
private Map<MessageId, ReferenceData> cpAddedMessageIds;
private final boolean debug = LOG.isDebugEnabled();
@ -255,7 +256,7 @@ public class AMQMessageStore extends AbstractMessageStore {
MessageId id = ack.getLastMessageId();
data = messages.remove(id);
if (data == null) {
messageAcks.add(ack);
messageAcks.add(new MessageAckWithLocation(ack, location));
} else {
// message never got written so datafileReference will still exist
AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, data.getFileId());
@ -350,7 +351,7 @@ public class AMQMessageStore extends AbstractMessageStore {
* @throws IOException
*/
protected Location doAsyncWrite() throws IOException {
final List<MessageAck> cpRemovedMessageLocations;
final List<MessageAckWithLocation> cpRemovedMessageLocations;
final List<Location> cpActiveJournalLocations;
final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
final Location lastLocation;
@ -361,7 +362,7 @@ public class AMQMessageStore extends AbstractMessageStore {
cpRemovedMessageLocations = this.messageAcks;
cpActiveJournalLocations = new ArrayList<Location>(inFlightTxLocations);
this.messages = new LinkedHashMap<MessageId, ReferenceData>();
this.messageAcks = new ArrayList<MessageAck>();
this.messageAcks = new ArrayList<MessageAckWithLocation>();
lastLocation = this.lastLocation;
} finally {
lock.unlock();
@ -406,7 +407,7 @@ public class AMQMessageStore extends AbstractMessageStore {
persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context);
// Checkpoint the removed messages.
for (MessageAck ack : cpRemovedMessageLocations) {
for (MessageAckWithLocation ack : cpRemovedMessageLocations) {
try {
referenceStore.removeMessage(transactionTemplate.getContext(), ack);
} catch (Throwable e) {
@ -576,5 +577,5 @@ public class AMQMessageStore extends AbstractMessageStore {
}
getReferenceStore().setBatch(messageId);
}
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
@ -570,7 +571,6 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
*
* @throws IOException
* @throws IOException
* @throws InvalidLocationException
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException, IOException {
@ -1051,7 +1051,6 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
}
protected void lock() throws Exception {
lockLogged = false;
lockAquired = false;

View File

@ -29,6 +29,7 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.MessageAckWithLocation;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageRecoveryListener;
@ -203,17 +204,17 @@ public class KahaReferenceStore extends AbstractMessageStore implements Referenc
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
removeMessage(ack.getLastMessageId());
}
public void removeMessage(MessageId msgId) throws IOException {
lock.lock();
try {
MessageId msgId = ack.getLastMessageId();
StoreEntry entry = messageContainer.getEntry(msgId);
if (entry != null) {
ReferenceRecord rr = messageContainer.remove(msgId);
if (rr != null) {
removeInterest(rr);
if (ack instanceof MessageAckWithLocation) {
recordAckFileReferences((MessageAckWithLocation)ack, rr.getData().getFileId());
}
dispatchAudit.isDuplicate(msgId);
if (LOG.isDebugEnabled()) {
LOG.debug(destination.getPhysicalName() + " remove reference: " + msgId);
@ -230,12 +231,18 @@ public class KahaReferenceStore extends AbstractMessageStore implements Referenc
}
}
private void recordAckFileReferences(MessageAckWithLocation ack, int messageFileId) {
adapter.recordAckFileReferences(ack.location.getDataFileId(), messageFileId);
}
public void removeAllMessages(ConnectionContext context) throws IOException {
lock.lock();
try {
Set<MessageId> tmpSet = new HashSet<MessageId>(messageContainer.keySet());
MessageAck ack = new MessageAck();
for (MessageId id:tmpSet) {
removeMessage(id);
ack.setLastMessageId(id);
removeMessage(null, ack);
}
resetBatching();
messageContainer.clear();

View File

@ -252,7 +252,44 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
* @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse()
*/
public synchronized Set<Integer> getReferenceFileIdsInUse() throws IOException {
return new HashSet<Integer>(recordReferences.keySet());
Set inUse = new HashSet<Integer>(recordReferences.keySet());
Iterator<Map.Entry<Integer, Set<Integer>>> ackReferences = ackMessageFileMap.entrySet().iterator();
while (ackReferences.hasNext()) {
Map.Entry<Integer, Set<Integer>> ackReference = ackReferences.next();
if (!inUse.contains(ackReference.getKey())) {
// should we keep this data file
for (Integer referencedFileId : ackReference.getValue()) {
if (inUse.contains(referencedFileId)) {
// keep this ack file
inUse.add(ackReference.getKey());
LOG.debug("not removing data file: " + ackReference.getKey()
+ " as contained ack(s) refer to referencedFileId file: " + ackReference.getValue());
break;
}
}
}
if (!inUse.contains(ackReference.getKey())) {
ackReferences.remove();
}
}
return inUse;
}
Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
public synchronized void recordAckFileReferences(int ackDataFileId, int messageFileId) {
Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackDataFileId));
if (referenceFileIds == null) {
referenceFileIds = new HashSet<Integer>();
referenceFileIds.add(Integer.valueOf(messageFileId));
ackMessageFileMap.put(Integer.valueOf(ackDataFileId), referenceFileIds);
} else {
Integer id = Integer.valueOf(messageFileId);
if (!referenceFileIds.contains(id)) {
referenceFileIds.add(id);
}
}
}
/**
@ -409,4 +446,6 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
public void setIndexLoadFactor(int loadFactor) {
this.indexLoadFactor = loadFactor;
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
public class SparseAckReplayAfterStoreCleanupAMQStoreTest extends AMQ2832Test {
@Override
protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
brokerService.setPersistenceFactory(new AMQPersistenceAdapterFactory());
AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) brokerService.getPersistenceFactory();
// ensure there are a bunch of data files but multiple entries in each
factory.setMaxFileLength(1024 * 12);
// speed up the test case, checkpoint an cleanup early and often
factory.setCheckpointInterval(500);
factory.setCleanupInterval(500);
factory.setSyncOnWrite(false);
if (!deleteAllOnStart) {
factory.setForceRecoverReferenceStore(true);
}
brokerService.getPersistenceAdapter();
}
}