The actual Durable subscription wasn't getting removed from the Store so on restart they were recovered.  

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1464729 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-04-04 20:30:00 +00:00
parent 1b38caacf4
commit 0054941a53
4 changed files with 224 additions and 12 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@ -39,64 +40,78 @@ import org.apache.activemq.usage.Usage;
*/
public class DestinationFilter implements Destination {
private final Destination next;
protected final Destination next;
public DestinationFilter(Destination next) {
this.next = next;
}
@Override
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
next.acknowledge(context, sub, ack, node);
}
@Override
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
next.addSubscription(context, sub);
}
@Override
public Message[] browse() {
return next.browse();
}
@Override
public void dispose(ConnectionContext context) throws IOException {
next.dispose(context);
}
@Override
public boolean isDisposed() {
return next.isDisposed();
}
@Override
public void gc() {
next.gc();
}
@Override
public void markForGC(long timeStamp) {
next.markForGC(timeStamp);
}
@Override
public boolean canGC() {
return next.canGC();
}
@Override
public long getInactiveTimoutBeforeGC() {
return next.getInactiveTimoutBeforeGC();
}
@Override
public ActiveMQDestination getActiveMQDestination() {
return next.getActiveMQDestination();
}
@Override
public DeadLetterStrategy getDeadLetterStrategy() {
return next.getDeadLetterStrategy();
}
@Override
public DestinationStatistics getDestinationStatistics() {
return next.getDestinationStatistics();
}
@Override
public String getName() {
return next.getName();
}
@Override
public MemoryUsage getMemoryUsage() {
return next.getMemoryUsage();
}
@ -106,22 +121,27 @@ public class DestinationFilter implements Destination {
next.setMemoryUsage(memoryUsage);
}
@Override
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
next.removeSubscription(context, sub, lastDeliveredSequenceId);
}
@Override
public void send(ProducerBrokerExchange context, Message messageSend) throws Exception {
next.send(context, messageSend);
}
@Override
public void start() throws Exception {
next.start();
}
@Override
public void stop() throws Exception {
next.stop();
}
@Override
public List<Subscription> getConsumers() {
return next.getConsumers();
}
@ -143,102 +163,127 @@ public class DestinationFilter implements Destination {
}
}
@Override
public MessageStore getMessageStore() {
return next.getMessageStore();
}
@Override
public boolean isProducerFlowControl() {
return next.isProducerFlowControl();
}
@Override
public void setProducerFlowControl(boolean value) {
next.setProducerFlowControl(value);
}
@Override
public boolean isAlwaysRetroactive() {
return next.isAlwaysRetroactive();
}
@Override
public void setAlwaysRetroactive(boolean value) {
next.setAlwaysRetroactive(value);
}
@Override
public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
next.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
}
@Override
public long getBlockedProducerWarningInterval() {
return next.getBlockedProducerWarningInterval();
}
@Override
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
next.addProducer(context, info);
}
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
next.removeProducer(context, info);
}
@Override
public int getMaxAuditDepth() {
return next.getMaxAuditDepth();
}
@Override
public int getMaxProducersToAudit() {
return next.getMaxProducersToAudit();
}
@Override
public boolean isEnableAudit() {
return next.isEnableAudit();
}
@Override
public void setEnableAudit(boolean enableAudit) {
next.setEnableAudit(enableAudit);
}
@Override
public void setMaxAuditDepth(int maxAuditDepth) {
next.setMaxAuditDepth(maxAuditDepth);
}
@Override
public void setMaxProducersToAudit(int maxProducersToAudit) {
next.setMaxProducersToAudit(maxProducersToAudit);
}
@Override
public boolean isActive() {
return next.isActive();
}
@Override
public int getMaxPageSize() {
return next.getMaxPageSize();
}
@Override
public void setMaxPageSize(int maxPageSize) {
next.setMaxPageSize(maxPageSize);
}
@Override
public boolean isUseCache() {
return next.isUseCache();
}
@Override
public void setUseCache(boolean useCache) {
next.setUseCache(useCache);
}
@Override
public int getMinimumMessageSize() {
return next.getMinimumMessageSize();
}
@Override
public void setMinimumMessageSize(int minimumMessageSize) {
next.setMinimumMessageSize(minimumMessageSize);
}
@Override
public void wakeup() {
next.wakeup();
}
@Override
public boolean isLazyDispatch() {
return next.isLazyDispatch();
}
@Override
public void setLazyDispatch(boolean value) {
next.setLazyDispatch(value);
}
@ -247,70 +292,87 @@ public class DestinationFilter implements Destination {
next.messageExpired(context, prefetchSubscription, node);
}
@Override
public boolean iterate() {
return next.iterate();
}
@Override
public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
next.fastProducer(context, producerInfo);
}
@Override
public void isFull(ConnectionContext context, Usage<?> usage) {
next.isFull(context, usage);
}
@Override
public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
next.messageConsumed(context, messageReference);
}
@Override
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
next.messageDelivered(context, messageReference);
}
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
next.messageDiscarded(context, sub, messageReference);
}
@Override
public void slowConsumer(ConnectionContext context, Subscription subs) {
next.slowConsumer(context, subs);
}
@Override
public void messageExpired(ConnectionContext context, Subscription subs, MessageReference node) {
next.messageExpired(context, subs, node);
}
@Override
public int getMaxBrowsePageSize() {
return next.getMaxBrowsePageSize();
}
@Override
public void setMaxBrowsePageSize(int maxPageSize) {
next.setMaxBrowsePageSize(maxPageSize);
}
@Override
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
next.processDispatchNotification(messageDispatchNotification);
}
@Override
public int getCursorMemoryHighWaterMark() {
return next.getCursorMemoryHighWaterMark();
}
@Override
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
}
@Override
public boolean isPrioritizedMessages() {
return next.isPrioritizedMessages();
}
@Override
public SlowConsumerStrategy getSlowConsumerStrategy() {
return next.getSlowConsumerStrategy();
}
@Override
public boolean isDoOptimzeMessageStorage() {
return next.isDoOptimzeMessageStorage();
}
@Override
public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
next.setDoOptimzeMessageStorage(doOptimzeMessageStorage);
}

View File

@ -31,6 +31,7 @@ import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.virtual.VirtualTopicInterceptor;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConsumerId;
@ -65,6 +66,7 @@ public class TopicRegion extends AbstractRegion {
if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) {
this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true);
this.cleanupTask = new TimerTask() {
@Override
public void run() {
doCleanup();
}
@ -193,10 +195,12 @@ public class TopicRegion extends AbstractRegion {
destinationsLock.readLock().lock();
try {
for (Destination dest : destinations.values()) {
//Account for virtual destinations
if (dest instanceof Topic){
Topic topic = (Topic)dest;
topic.deleteSubscription(context, key);
} else if (dest instanceof VirtualTopicInterceptor) {
VirtualTopicInterceptor virtualTopic = (VirtualTopicInterceptor) dest;
virtualTopic.getTopic().deleteSubscription(context, key);
}
}
} finally {

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
@ -27,15 +28,13 @@ import org.apache.activemq.util.LRUCache;
/**
* A Destination which implements <a
* href="http://activemq.org/site/virtual-destinations.html">Virtual Topic</a>
*
*
*/
public class VirtualTopicInterceptor extends DestinationFilter {
private String prefix;
private String postfix;
private boolean local;
private LRUCache<ActiveMQDestination,ActiveMQQueue> cache = new LRUCache<ActiveMQDestination,ActiveMQQueue>();
private final String prefix;
private final String postfix;
private final boolean local;
private final LRUCache<ActiveMQDestination,ActiveMQQueue> cache = new LRUCache<ActiveMQDestination,ActiveMQQueue>();
public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
super(next);
@ -44,6 +43,11 @@ public class VirtualTopicInterceptor extends DestinationFilter {
this.local = local;
}
public Topic getTopic() {
return (Topic) this.next;
}
@Override
public void send(ProducerBrokerExchange context, Message message) throws Exception {
if (!message.isAdvisory() && !(local && message.getBrokerPath() != null)) {
ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination());

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class AMQ4356Test {
private static BrokerService brokerService;
private static String BROKER_ADDRESS = "tcp://localhost:0";
private String connectionUri;
private ActiveMQConnectionFactory cf;
private final String CLIENT_ID = "AMQ4356Test";
private final String SUBSCRIPTION_NAME = "AMQ4356Test";
private void createBroker(boolean deleteOnStart) throws Exception {
brokerService = new BrokerService();
brokerService.setUseJmx(true);
brokerService.setDeleteAllMessagesOnStartup(deleteOnStart);
connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
brokerService.start();
brokerService.waitUntilStarted();
}
private void startBroker() throws Exception {
createBroker(true);
}
private void restartBroker() throws Exception {
brokerService.stop();
brokerService.waitUntilStopped();
createBroker(false);
}
@Before
public void setUp() throws Exception {
startBroker();
cf = new ActiveMQConnectionFactory(connectionUri);
}
@After
public void tearDown() throws Exception {
brokerService.stop();
brokerService.waitUntilStopped();
}
@Test
public void testVirtualTopicUnsubDurable() throws Exception {
Connection connection = cf.createConnection();
connection.setClientID(CLIENT_ID);
connection.start();
// create consumer 'cluster'
ActiveMQQueue queue1 = new ActiveMQQueue(getVirtualTopicConsumerName());
ActiveMQQueue queue2 = new ActiveMQQueue(getVirtualTopicConsumerName());
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer c1 = session.createConsumer(queue1);
c1.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
}
});
MessageConsumer c2 = session.createConsumer(queue2);
c2.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
}
});
ActiveMQTopic topic = new ActiveMQTopic(getVirtualTopicName());
MessageConsumer c3 = session.createDurableSubscriber(topic, SUBSCRIPTION_NAME);
assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
c3.close();
// create topic producer
MessageProducer producer = session.createProducer(topic);
assertNotNull(producer);
int total = 10;
for (int i = 0; i < total; i++) {
producer.send(session.createTextMessage("message: " + i));
}
assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
session.unsubscribe(SUBSCRIPTION_NAME);
connection.close();
assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
restartBroker();
assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
}
protected String getVirtualTopicName() {
return "VirtualTopic.TEST";
}
protected String getVirtualTopicConsumerName() {
return "Consumer.A.VirtualTopic.TEST";
}
}