Fixed a few of the broken durable subscription tests.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@377936 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-02-15 04:35:12 +00:00
parent 74588d1b30
commit 0f502fae3d
15 changed files with 242 additions and 231 deletions

View File

@ -347,6 +347,9 @@
<!-- http://jira.activemq.org/jira/browse/AMQ-522 -->
<exclude>**/ProxyConnectorTest.*</exclude>
<!-- http://jira.activemq.org/jira/browse/AMQ-560 -->
<exclude>**/FanoutTransportBrokerTest.*</exclude>
</excludes>
</unitTest>

View File

@ -50,8 +50,8 @@ public class ManagedQueueRegion extends QueueRegion {
super.destroySubscription(sub);
}
protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
Destination rc = super.createDestination(destination);
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
Destination rc = super.createDestination(context, destination);
regionBroker.register(destination, rc);
return rc;
}

View File

@ -48,8 +48,8 @@ public class ManagedTempQueueRegion extends TempQueueRegion {
super.destroySubscription(sub);
}
protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
Destination rc = super.createDestination(destination);
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
Destination rc = super.createDestination(context, destination);
regionBroker.register(destination, rc);
return rc;
}

View File

@ -48,8 +48,8 @@ public class ManagedTempTopicRegion extends TempTopicRegion {
super.destroySubscription(sub);
}
protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
Destination rc = super.createDestination(destination);
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
Destination rc = super.createDestination(context, destination);
regionBroker.register(destination, rc);
return rc;
}

View File

@ -50,8 +50,8 @@ public class ManagedTopicRegion extends TopicRegion {
super.destroySubscription(sub);
}
protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
Destination rc = super.createDestination(destination);
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
Destination rc = super.createDestination(context, destination);
regionBroker.register(destination, rc);
return rc;
}

View File

@ -73,7 +73,7 @@ abstract public class AbstractRegion implements Region {
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
log.debug("Adding destination: "+destination);
Destination dest = createDestination(destination);
Destination dest = createDestination(context, destination);
dest.start();
synchronized(destinationsMutex){
destinations.put(destination,dest);
@ -241,7 +241,7 @@ abstract public class AbstractRegion implements Region {
}
protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Throwable;
abstract protected Destination createDestination(ActiveMQDestination destination) throws Throwable;
abstract protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable;
public boolean isAutoCreateDestinations() {
return autoCreateDestinations;

View File

@ -27,76 +27,66 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.util.SubscriptionKey;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
public class DurableTopicSubscription extends PrefetchSubscription {
final protected String clientId;
final protected String subscriptionName;
final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap();
boolean active=true;
boolean recovered=true;
private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap();
private final ConcurrentHashMap destinations = new ConcurrentHashMap();
private final SubscriptionKey subscriptionKey;
private boolean active=false;
public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(broker,context, info);
this.clientId = context.getClientId();
this.subscriptionName = info.getSubcriptionName();
}
public DurableTopicSubscription(Broker broker,SubscriptionInfo info) throws InvalidSelectorException {
super(broker,null, createFakeConsumerInfo(info));
this.clientId = info.getClientId();
this.subscriptionName = info.getSubcriptionName();
active=false;
recovered=false;
}
private static ConsumerInfo createFakeConsumerInfo(SubscriptionInfo info) {
ConsumerInfo rc = new ConsumerInfo();
rc.setSelector(info.getSelector());
rc.setSubcriptionName(info.getSubcriptionName());
rc.setDestination(info.getDestination());
return rc;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
}
synchronized public boolean isActive() {
return active;
}
synchronized public boolean isRecovered() {
return recovered;
}
protected boolean isFull() {
return !active || super.isFull();
}
synchronized public void gc() {
if( !active && recovered ) {
recovered = false;
for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
MessageReference node = (MessageReference) iter.next();
// node.decrementTargetCount();
iter.remove();
}
for (Iterator iter = matched.iterator(); iter.hasNext();) {
MessageReference node = (MessageReference) iter.next();
// node.decrementTargetCount();
iter.remove();
}
delivered=0;
synchronized public void add(ConnectionContext context, Destination destination) throws Throwable {
super.add(context, destination);
destinations.put(destination.getActiveMQDestination(), destination);
if( active ) {
Topic topic = (Topic) destination;
topic.activate(context, this);
}
}
synchronized public void deactivate() {
synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws Throwable {
if( !active ) {
this.active = true;
this.context = context;
this.info = info;
for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
Topic topic = (Topic) iter.next();
topic.activate(context, this);
}
if( !isFull() ) {
dispatchMatched();
}
}
}
synchronized public void deactivate() throws Throwable {
active=false;
for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
Topic topic = (Topic) iter.next();
topic.deactivate(context, this);
}
for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
// Mark the dispatched messages as redelivered for next time.
MessageReference node = (MessageReference) iter.next();
Integer count = (Integer) redeliveredMessages.get(node.getMessageId());
if( count !=null ) {
@ -105,32 +95,16 @@ public class DurableTopicSubscription extends PrefetchSubscription {
redeliveredMessages.put(node.getMessageId(), new Integer(1));
}
// Undo the dispatch.
matched.addFirst(node);
iter.remove();
}
for (Iterator iter = matched.iterator(); iter.hasNext();) {
MessageReference node = (MessageReference) iter.next();
// node.decrementTargetCount();
iter.remove();
}
delivered=0;
}
synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws Throwable {
if( !active ) {
this.active = true;
this.context = context;
this.info = info;
if( !recovered ) {
recovered=true;
for (Iterator iter = destinations.iterator(); iter.hasNext();) {
Topic topic = (Topic) iter.next();
topic.recover(context, this, false);
}
} else {
if( !isFull() ) {
dispatchMatched();
}
}
}
}
protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
MessageDispatch md = super.createMessageDispatch(node, message);
Integer count = (Integer) redeliveredMessages.get(node.getMessageId());
@ -141,7 +115,9 @@ public class DurableTopicSubscription extends PrefetchSubscription {
}
synchronized public void add(MessageReference node) throws Throwable {
assert recovered;
if( !active ) {
return;
}
node = new IndirectMessageReference(node.getRegionDestination(), (Message) node);
super.add(node);
node.decrementReferenceCount();
@ -152,7 +128,6 @@ public class DurableTopicSubscription extends PrefetchSubscription {
}
public synchronized void acknowledge(ConnectionContext context, MessageAck ack) throws Throwable {
assert recovered;
super.acknowledge(context, ack);
}
@ -163,7 +138,7 @@ public class DurableTopicSubscription extends PrefetchSubscription {
}
public String getSubscriptionName() {
return subscriptionName;
return subscriptionKey.getSubscriptionName();
}
public String toString() {
@ -177,7 +152,11 @@ public class DurableTopicSubscription extends PrefetchSubscription {
}
public String getClientId() {
return clientId;
return subscriptionKey.getClientId();
}
public SubscriptionKey getSubscriptionKey() {
return subscriptionKey;
}
}

View File

@ -37,6 +37,7 @@ import java.util.LinkedList;
* @version $Revision: 1.15 $
*/
abstract public class PrefetchSubscription extends AbstractSubscription{
static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
final protected LinkedList matched=new LinkedList();
final protected LinkedList dispatched=new LinkedList();
@ -45,12 +46,16 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
int preLoadSize=0;
boolean dispatching=false;
long enqueueCounter;
long dispatchCounter;
public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
throws InvalidSelectorException{
super(broker,context,info);
}
synchronized public void add(MessageReference node) throws Throwable{
enqueueCounter++;
if(!isFull()&&!isSlaveBroker()){
dispatch(node);
}else{
@ -244,6 +249,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
}
// Make sure we can dispatch a message.
if(canDispatch(node)&&!isSlaveBroker()){
dispatchCounter++;
MessageDispatch md=createMessageDispatch(node,message);
dispatched.addLast(node);
incrementPreloadSize(node.getMessage().getSize());
@ -325,4 +331,13 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
*/
protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference node)
throws IOException{}
public long getDispatchCounter() {
return dispatchCounter;
}
public long getEnqueueCounter() {
return enqueueCounter;
}
}

View File

@ -54,7 +54,7 @@ public class QueueRegion extends AbstractRegion {
// Implementation methods
// -------------------------------------------------------------------------
protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
configureQueue(queue, destination);

View File

@ -38,7 +38,7 @@ public class TempQueueRegion extends AbstractRegion {
setAutoCreateDestinations(false);
}
protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {

View File

@ -37,7 +37,7 @@ public class TempTopicRegion extends AbstractRegion {
setAutoCreateDestinations(false);
}
protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
return new Topic(destination, null, memoryManager, destinationStatistics, taskRunnerFactory) {

View File

@ -42,8 +42,8 @@ import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.SubscriptionKey;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
/**
* The Topic is a destination that sends a copy of a message to every active
@ -64,7 +64,7 @@ public class Topic implements Destination {
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new FixedCountSubscriptionRecoveryPolicy();
private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
private AtomicInteger durableSubscriberCounter = new AtomicInteger();
private final ConcurrentHashMap durableSubcribers = new ConcurrentHashMap();
public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) {
@ -72,15 +72,6 @@ public class Topic implements Destination {
this.destination = destination;
this.store = store;
this.usageManager = memoryManager;
// TODO: switch back when cache is working again.
// this.cache = cache;
// destinationStatistics.setMessagesCached(cache.getMessagesCached());
// CacheEvictionUsageListener listener = new
// CacheEvictionUsageListener(memoryManager, 90, 50, taskFactory);
// listener.add(cache);
// this.memoryManager.addUsageListener(listener);
this.destinationStatistics.setParent(parentStats);
}
@ -89,25 +80,14 @@ public class Topic implements Destination {
}
public void addSubscription(ConnectionContext context, final Subscription sub) throws Throwable {
destinationStatistics.getConsumers().increment();
sub.add(context, this);
if (sub.getConsumerInfo().isDurable()) {
recover(context, (DurableTopicSubscription) sub, true);
}
else {
recover(context, sub);
}
}
/**
* Used to recover the message list non durable subscriptions. Recovery only happens if the consumer is
* retroactive.
*
* @param context
* @param sub
* @throws Throwable
*/
private void recover(ConnectionContext context, final Subscription sub) throws Throwable {
sub.add(context, this);
if ( !sub.getConsumerInfo().isDurable() ) {
destinationStatistics.getConsumers().increment();
// Do a retroactive recovery if needed.
if (sub.getConsumerInfo().isRetroactive()) {
// synchronize with dispatch method so that no new messages are sent
@ -129,17 +109,37 @@ public class Topic implements Destination {
consumers.add(sub);
}
}
} else {
DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
}
}
/**
* Used to recover the message list for a durable subscription.
*
* @param context
* @param sub
* @param initialActivation
* @throws Throwable
*/
public void recover(ConnectionContext context, final DurableTopicSubscription sub, boolean initialActivation) throws Throwable {
public void removeSubscription(ConnectionContext context, Subscription sub) throws Throwable {
if ( !sub.getConsumerInfo().isDurable() ) {
destinationStatistics.getConsumers().decrement();
synchronized(consumers) {
consumers.remove(sub);
}
}
sub.remove(context, this);
}
public void addInactiveSubscription(ConnectionContext context, DurableTopicSubscription sub) throws Throwable {
sub.add(context, this);
destinationStatistics.getConsumers().increment();
durableSubcribers.put(sub.getSubscriptionKey(), sub);
}
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
if (store != null) {
store.deleteSubscription(key.clientId, key.subscriptionName);
durableSubcribers.remove(key);
destinationStatistics.getConsumers().decrement();
}
}
public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Throwable {
// synchronize with dispatch method so that no new messages are sent
// while
@ -147,18 +147,17 @@ public class Topic implements Destination {
dispatchValve.turnOff();
try {
boolean persistenceWasOptimized = canOptimizeOutPersistence();
if (initialActivation) {
synchronized(consumers) {
consumers.add(sub);
durableSubscriberCounter.incrementAndGet();
}
consumers.add(subscription);
}
if (store != null) {
String clientId = sub.getClientId();
String subscriptionName = sub.getSubscriptionName();
String selector = sub.getConsumerInfo().getSelector();
if (store == null )
return;
// Recover the durable subscription.
String clientId = subscription.getClientId();
String subscriptionName = subscription.getSubscriptionName();
String selector = subscription.getConsumerInfo().getSelector();
SubscriptionInfo info = store.lookupSubscription(clientId, subscriptionName);
if (info != null) {
// Check to see if selector changed.
@ -171,10 +170,9 @@ public class Topic implements Destination {
}
// Do we need to create the subscription?
if (info == null) {
store.addSubsciption(clientId, subscriptionName, selector, sub.getConsumerInfo().isRetroactive());
store.addSubsciption(clientId, subscriptionName, selector, subscription.getConsumerInfo().isRetroactive());
}
if (sub.isRecovered()) {
final MessageEvaluationContext msgContext = new MessageEvaluationContext();
msgContext.setDestination(destination);
store.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
@ -182,8 +180,8 @@ public class Topic implements Destination {
message.setRegionDestination(Topic.this);
try {
msgContext.setMessageReference(message);
if (sub.matches(message, msgContext)) {
sub.add(message);
if (subscription.matches(message, msgContext)) {
subscription.add(message);
}
}
catch (InterruptedException e) {
@ -200,17 +198,15 @@ public class Topic implements Destination {
}
});
if( initialActivation && sub.getConsumerInfo().isRetroactive() ) {
// Then use the subscriptionRecoveryPolicy since there will not be any messages in the persistent store.
if( persistenceWasOptimized ) {
subscriptionRecoveryPolicy.recover(context, this, sub);
if( true && subscription.getConsumerInfo().isRetroactive() ) {
// If nothing was in the persistent store, then try to use the recovery policy.
if( subscription.getEnqueueCounter() == 0 ) {
subscriptionRecoveryPolicy.recover(context, this, subscription);
} else {
// TODO: implement something like
// subscriptionRecoveryPolicy.recoverNonPersistent(context, this, sub);
}
}
}
}
}
finally {
@ -218,14 +214,14 @@ public class Topic implements Destination {
}
}
public void removeSubscription(ConnectionContext context, Subscription sub) throws Throwable {
destinationStatistics.getConsumers().decrement();
public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Throwable {
synchronized(consumers) {
consumers.remove(sub);
}
sub.remove(context, this);
}
public void send(final ConnectionContext context, final Message message) throws Throwable {
if (context.isProducerFlowControl())
@ -259,18 +255,7 @@ public class Topic implements Destination {
}
private boolean canOptimizeOutPersistence() {
return durableSubscriberCounter.get()==0;
}
public void createSubscription(SubscriptionKey key) {
durableSubscriberCounter.incrementAndGet();
}
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
if (store != null) {
store.deleteSubscription(key.clientId, key.subscriptionName);
durableSubscriberCounter.decrementAndGet();
}
return durableSubcribers.size()==0;
}
public String toString() {
@ -424,4 +409,5 @@ public class Topic implements Destination {
}
}
}

View File

@ -86,19 +86,21 @@ public class TopicRegion extends AbstractRegion {
super.removeConsumer(context, sub.getConsumerInfo());
super.addConsumer(context, info);
sub = (DurableTopicSubscription) durableSubscriptions.get(key);
}
else {
// Change the consumer id key of the durable sub.
if( sub.getConsumerInfo().getConsumerId()!=null )
subscriptions.remove(sub.getConsumerInfo().getConsumerId());
subscriptions.put(info.getConsumerId(), sub);
sub.activate(context, info);
}
}
else {
super.addConsumer(context, info);
sub = (DurableTopicSubscription) durableSubscriptions.get(key);
}
sub.activate(context, info);
}
else {
super.addConsumer(context, info);
@ -145,7 +147,7 @@ public class TopicRegion extends AbstractRegion {
// Implementation methods
// -------------------------------------------------------------------------
protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Throwable {
TopicMessageStore store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
Topic topic = new Topic(destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
configureTopic(topic, destination);
@ -154,14 +156,31 @@ public class TopicRegion extends AbstractRegion {
if (store != null) {
SubscriptionInfo[] infos = store.getAllSubscriptions();
for (int i = 0; i < infos.length; i++) {
log.info("Restoring durable subscription: "+infos[i]);
createDurableSubscription(topic, infos[i]);
SubscriptionInfo info = infos[i];
log.debug("Restoring durable subscription: "+infos);
SubscriptionKey key = new SubscriptionKey(info);
// A single durable sub may be subscribing to multiple topics. so it might exist already.
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
if( sub == null ) {
sub = (DurableTopicSubscription) createSubscription(context, createInactiveConsumerInfo(info));
}
topic.addInactiveSubscription(context, sub);
}
}
return topic;
}
private static ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
ConsumerInfo rc = new ConsumerInfo();
rc.setSelector(info.getSelector());
rc.setSubcriptionName(info.getSubcriptionName());
rc.setDestination(info.getDestination());
return rc;
}
protected void configureTopic(Topic topic, ActiveMQDestination destination) {
if (policyMap != null) {
PolicyEntry entry = policyMap.getEntryFor(destination);
@ -189,17 +208,6 @@ public class TopicRegion extends AbstractRegion {
}
}
public Subscription createDurableSubscription(Topic topic, SubscriptionInfo info) throws Throwable {
SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubcriptionName());
topic.createSubscription(key);
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
sub = new DurableTopicSubscription(broker,info);
sub.add(null, topic);
durableSubscriptions.put(key, sub);
return sub;
}
/**
*/
private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) {

View File

@ -16,17 +16,26 @@
*/
package org.apache.activemq.util;
import org.apache.activemq.command.SubscriptionInfo;
public class SubscriptionKey {
public final String clientId;
public final String subscriptionName;
private final int hashValue;
public SubscriptionKey(SubscriptionInfo info) {
this(info.getClientId(), info.getSubcriptionName());
}
public SubscriptionKey(String clientId, String subscriptionName) {
this.clientId = clientId;
this.subscriptionName = subscriptionName;
hashValue = clientId.hashCode()^subscriptionName.hashCode();
}
public int hashCode() {
return hashValue;
}
@ -43,4 +52,12 @@ public class SubscriptionKey {
public String toString() {
return clientId+":"+subscriptionName;
}
public String getClientId() {
return clientId;
}
public String getSubscriptionName() {
return subscriptionName;
}
}

View File

@ -185,7 +185,10 @@ public class JmsRedeliveredTest extends TestCase {
Topic topic = session.createTopic("topic-"+getName());
MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1");
MessageProducer producer = createProducer(session, topic);
// This case only works with persistent messages since transient messages
// are dropped when the consumer goes offline.
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(createTextMessage(session));
// Consume the message...