mirror of https://github.com/apache/activemq.git
synchronized around memoryTable - prevent concurrent access whilst iterating
for recovery git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@378727 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
068c64639f
commit
7f384b620e
|
@ -72,12 +72,15 @@ public class MemoryMessageStore implements MessageStore {
|
|||
}
|
||||
|
||||
public void recover(MessageRecoveryListener listener) throws Throwable {
|
||||
for (Iterator iter = messageTable.values().iterator(); iter.hasNext();) {
|
||||
Object msg = (Object) iter.next();
|
||||
if( msg.getClass() == String.class ) {
|
||||
listener.recoverMessageReference((String) msg);
|
||||
} else {
|
||||
listener.recoverMessage((Message) msg);
|
||||
// the message table is a synchronizedMap - so just have to synchronize here
|
||||
synchronized(messageTable){
|
||||
for(Iterator iter=messageTable.values().iterator();iter.hasNext();){
|
||||
Object msg=(Object) iter.next();
|
||||
if(msg.getClass()==String.class){
|
||||
listener.recoverMessageReference((String) msg);
|
||||
}else{
|
||||
listener.recoverMessage((Message) msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,20 +88,24 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
|
|||
subscriberDatabase.remove(key);
|
||||
}
|
||||
|
||||
public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Throwable {
|
||||
MessageId lastAck = (MessageId) ackDatabase.get(new SubscriptionKey(clientId, subscriptionName));
|
||||
boolean pastLastAck = lastAck==null;
|
||||
for (Iterator iter = messageTable.entrySet().iterator(); iter.hasNext();) {
|
||||
Map.Entry entry = (Entry) iter.next();
|
||||
if( pastLastAck ) {
|
||||
Object msg = entry.getValue();
|
||||
if( msg.getClass() == String.class ) {
|
||||
listener.recoverMessageReference((String) msg);
|
||||
} else {
|
||||
listener.recoverMessage((Message) msg);
|
||||
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
|
||||
throws Throwable{
|
||||
MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
|
||||
boolean pastLastAck=lastAck==null;
|
||||
// the message table is a synchronizedMap - so just have to synchronize here
|
||||
synchronized(messageTable){
|
||||
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
|
||||
Map.Entry entry=(Entry) iter.next();
|
||||
if(pastLastAck){
|
||||
Object msg=entry.getValue();
|
||||
if(msg.getClass()==String.class){
|
||||
listener.recoverMessageReference((String) msg);
|
||||
}else{
|
||||
listener.recoverMessage((Message) msg);
|
||||
}
|
||||
}else{
|
||||
pastLastAck=entry.getKey().equals(lastAck);
|
||||
}
|
||||
} else {
|
||||
pastLastAck = entry.getKey().equals(lastAck);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue