mirror of
https://github.com/apache/activemq.git
synced 2025-02-18 16:10:45 +00:00
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@646422 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8bf9c17dd9
commit
aba1195cc5
@ -54,6 +54,9 @@ public class ContainerKeySetIterator implements Iterator {
|
|||||||
public void remove() {
|
public void remove() {
|
||||||
if (currentItem != null) {
|
if (currentItem != null) {
|
||||||
container.remove(currentItem);
|
container.remove(currentItem);
|
||||||
|
if (nextItem != null) {
|
||||||
|
list.refreshEntry(nextItem);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,8 @@
|
|||||||
package org.apache.activemq.store.kahadaptor;
|
package org.apache.activemq.store.kahadaptor;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
@ -192,11 +194,15 @@ public class KahaReferenceStore implements ReferenceStore {
|
|||||||
public void removeAllMessages(ConnectionContext context) throws IOException {
|
public void removeAllMessages(ConnectionContext context) throws IOException {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
|
Set<MessageId> tmpSet = new HashSet(messageContainer.keySet());
|
||||||
|
for (MessageId id:tmpSet) {
|
||||||
|
removeMessage(id);
|
||||||
|
}
|
||||||
|
resetBatching();
|
||||||
messageContainer.clear();
|
messageContainer.clear();
|
||||||
}finally {
|
}finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ActiveMQDestination getDestination() {
|
public ActiveMQDestination getDestination() {
|
||||||
|
@ -17,8 +17,10 @@
|
|||||||
package org.apache.activemq.store.kahadaptor;
|
package org.apache.activemq.store.kahadaptor;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
|
|
||||||
@ -301,6 +303,23 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void removeAllMessages(ConnectionContext context) throws IOException {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
Set<String> tmpSet = new HashSet<String>(subscriberContainer.keySet());
|
||||||
|
for (String key:tmpSet) {
|
||||||
|
TopicSubContainer container = subscriberMessages.get(key);
|
||||||
|
if (container != null) {
|
||||||
|
container.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ackContainer.clear();
|
||||||
|
}finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
super.removeAllMessages(context);
|
||||||
|
}
|
||||||
|
|
||||||
protected void removeSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
|
protected void removeSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
|
||||||
String subscriberKey = getSubscriptionKey(clientId, subscriptionName);
|
String subscriberKey = getSubscriptionKey(clientId, subscriptionName);
|
||||||
String containerName = getSubscriptionContainerName(subscriberKey);
|
String containerName = getSubscriptionContainerName(subscriberKey);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user