diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index 82d8163252..0fc2982639 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.List; import java.util.Set; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -39,89 +40,108 @@ import org.apache.activemq.usage.Usage; */ public class DestinationFilter implements Destination { - private final Destination next; + protected final Destination next; public DestinationFilter(Destination next) { this.next = next; } + @Override public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException { next.acknowledge(context, sub, ack, node); } + @Override public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { next.addSubscription(context, sub); } + @Override public Message[] browse() { return next.browse(); } + @Override public void dispose(ConnectionContext context) throws IOException { next.dispose(context); } + @Override public boolean isDisposed() { return next.isDisposed(); } + @Override public void gc() { next.gc(); } + @Override public void markForGC(long timeStamp) { next.markForGC(timeStamp); } + @Override public boolean canGC() { return next.canGC(); } + @Override public long getInactiveTimoutBeforeGC() { return next.getInactiveTimoutBeforeGC(); } + @Override public ActiveMQDestination getActiveMQDestination() { return next.getActiveMQDestination(); } + @Override public DeadLetterStrategy getDeadLetterStrategy() { return next.getDeadLetterStrategy(); } + @Override public DestinationStatistics getDestinationStatistics() { return next.getDestinationStatistics(); } + @Override public String getName() { return next.getName(); } + @Override public MemoryUsage getMemoryUsage() { return next.getMemoryUsage(); } - @Override - public void setMemoryUsage(MemoryUsage memoryUsage) { - next.setMemoryUsage(memoryUsage); - } + @Override + public void setMemoryUsage(MemoryUsage memoryUsage) { + next.setMemoryUsage(memoryUsage); + } + @Override public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { next.removeSubscription(context, sub, lastDeliveredSequenceId); } + @Override public void send(ProducerBrokerExchange context, Message messageSend) throws Exception { next.send(context, messageSend); } + @Override public void start() throws Exception { next.start(); } + @Override public void stop() throws Exception { next.stop(); } + @Override public List getConsumers() { return next.getConsumers(); } @@ -143,102 +163,127 @@ public class DestinationFilter implements Destination { } } + @Override public MessageStore getMessageStore() { return next.getMessageStore(); } + @Override public boolean isProducerFlowControl() { return next.isProducerFlowControl(); } + @Override public void setProducerFlowControl(boolean value) { next.setProducerFlowControl(value); } + @Override public boolean isAlwaysRetroactive() { return next.isAlwaysRetroactive(); } + @Override public void setAlwaysRetroactive(boolean value) { next.setAlwaysRetroactive(value); } + @Override public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { next.setBlockedProducerWarningInterval(blockedProducerWarningInterval); } + @Override public long getBlockedProducerWarningInterval() { return next.getBlockedProducerWarningInterval(); } + @Override public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { next.addProducer(context, info); } + @Override public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { next.removeProducer(context, info); } + @Override public int getMaxAuditDepth() { return next.getMaxAuditDepth(); } + @Override public int getMaxProducersToAudit() { return next.getMaxProducersToAudit(); } + @Override public boolean isEnableAudit() { return next.isEnableAudit(); } + @Override public void setEnableAudit(boolean enableAudit) { next.setEnableAudit(enableAudit); } + @Override public void setMaxAuditDepth(int maxAuditDepth) { next.setMaxAuditDepth(maxAuditDepth); } + @Override public void setMaxProducersToAudit(int maxProducersToAudit) { next.setMaxProducersToAudit(maxProducersToAudit); } + @Override public boolean isActive() { return next.isActive(); } + @Override public int getMaxPageSize() { return next.getMaxPageSize(); } + @Override public void setMaxPageSize(int maxPageSize) { next.setMaxPageSize(maxPageSize); } + @Override public boolean isUseCache() { return next.isUseCache(); } + @Override public void setUseCache(boolean useCache) { next.setUseCache(useCache); } + @Override public int getMinimumMessageSize() { return next.getMinimumMessageSize(); } + @Override public void setMinimumMessageSize(int minimumMessageSize) { next.setMinimumMessageSize(minimumMessageSize); } + @Override public void wakeup() { next.wakeup(); } + @Override public boolean isLazyDispatch() { return next.isLazyDispatch(); } + @Override public void setLazyDispatch(boolean value) { next.setLazyDispatch(value); } @@ -247,70 +292,87 @@ public class DestinationFilter implements Destination { next.messageExpired(context, prefetchSubscription, node); } + @Override public boolean iterate() { return next.iterate(); } + @Override public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { next.fastProducer(context, producerInfo); } + @Override public void isFull(ConnectionContext context, Usage usage) { next.isFull(context, usage); } + @Override public void messageConsumed(ConnectionContext context, MessageReference messageReference) { next.messageConsumed(context, messageReference); } + @Override public void messageDelivered(ConnectionContext context, MessageReference messageReference) { next.messageDelivered(context, messageReference); } + @Override public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { next.messageDiscarded(context, sub, messageReference); } + @Override public void slowConsumer(ConnectionContext context, Subscription subs) { next.slowConsumer(context, subs); } + @Override public void messageExpired(ConnectionContext context, Subscription subs, MessageReference node) { next.messageExpired(context, subs, node); } + @Override public int getMaxBrowsePageSize() { return next.getMaxBrowsePageSize(); } + @Override public void setMaxBrowsePageSize(int maxPageSize) { next.setMaxBrowsePageSize(maxPageSize); } + @Override public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { next.processDispatchNotification(messageDispatchNotification); } + @Override public int getCursorMemoryHighWaterMark() { return next.getCursorMemoryHighWaterMark(); } + @Override public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark); } + @Override public boolean isPrioritizedMessages() { return next.isPrioritizedMessages(); } + @Override public SlowConsumerStrategy getSlowConsumerStrategy() { return next.getSlowConsumerStrategy(); } + @Override public boolean isDoOptimzeMessageStorage() { return next.isDoOptimzeMessageStorage(); } + @Override public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) { next.setDoOptimzeMessageStorage(doOptimzeMessageStorage); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java index 0d01547b79..12e30f42c4 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java @@ -31,6 +31,7 @@ import javax.jms.JMSException; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.virtual.VirtualTopicInterceptor; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConsumerId; @@ -65,6 +66,7 @@ public class TopicRegion extends AbstractRegion { if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) { this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true); this.cleanupTask = new TimerTask() { + @Override public void run() { doCleanup(); } @@ -193,10 +195,12 @@ public class TopicRegion extends AbstractRegion { destinationsLock.readLock().lock(); try { for (Destination dest : destinations.values()) { - //Account for virtual destinations if (dest instanceof Topic){ Topic topic = (Topic)dest; topic.deleteSubscription(context, key); + } else if (dest instanceof VirtualTopicInterceptor) { + VirtualTopicInterceptor virtualTopic = (VirtualTopicInterceptor) dest; + virtualTopic.getTopic().deleteSubscription(context, key); } } } finally { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java index 9493ad6c6a..d959b42c62 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker.region.virtual; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationFilter; +import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.Message; @@ -27,15 +28,13 @@ import org.apache.activemq.util.LRUCache; /** * A Destination which implements Virtual Topic - * - * */ public class VirtualTopicInterceptor extends DestinationFilter { - private String prefix; - private String postfix; - private boolean local; - private LRUCache cache = new LRUCache(); + private final String prefix; + private final String postfix; + private final boolean local; + private final LRUCache cache = new LRUCache(); public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) { super(next); @@ -44,6 +43,11 @@ public class VirtualTopicInterceptor extends DestinationFilter { this.local = local; } + public Topic getTopic() { + return (Topic) this.next; + } + + @Override public void send(ProducerBrokerExchange context, Message message) throws Exception { if (!message.isAdvisory() && !(local && message.getBrokerPath() != null)) { ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java new file mode 100644 index 0000000000..aa3ac2c1a4 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ4356Test { + + private static BrokerService brokerService; + private static String BROKER_ADDRESS = "tcp://localhost:0"; + + private String connectionUri; + private ActiveMQConnectionFactory cf; + private final String CLIENT_ID = "AMQ4356Test"; + private final String SUBSCRIPTION_NAME = "AMQ4356Test"; + + private void createBroker(boolean deleteOnStart) throws Exception { + brokerService = new BrokerService(); + brokerService.setUseJmx(true); + brokerService.setDeleteAllMessagesOnStartup(deleteOnStart); + connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); + brokerService.start(); + brokerService.waitUntilStarted(); + + } + + private void startBroker() throws Exception { + createBroker(true); + } + + private void restartBroker() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + createBroker(false); + } + + @Before + public void setUp() throws Exception { + startBroker(); + cf = new ActiveMQConnectionFactory(connectionUri); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + @Test + public void testVirtualTopicUnsubDurable() throws Exception { + Connection connection = cf.createConnection(); + connection.setClientID(CLIENT_ID); + connection.start(); + + // create consumer 'cluster' + ActiveMQQueue queue1 = new ActiveMQQueue(getVirtualTopicConsumerName()); + ActiveMQQueue queue2 = new ActiveMQQueue(getVirtualTopicConsumerName()); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer c1 = session.createConsumer(queue1); + c1.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + } + }); + MessageConsumer c2 = session.createConsumer(queue2); + c2.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + } + }); + + ActiveMQTopic topic = new ActiveMQTopic(getVirtualTopicName()); + MessageConsumer c3 = session.createDurableSubscriber(topic, SUBSCRIPTION_NAME); + + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + c3.close(); + + // create topic producer + MessageProducer producer = session.createProducer(topic); + assertNotNull(producer); + + int total = 10; + for (int i = 0; i < total; i++) { + producer.send(session.createTextMessage("message: " + i)); + } + + assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + session.unsubscribe(SUBSCRIPTION_NAME); + connection.close(); + + assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + restartBroker(); + + assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + } + + protected String getVirtualTopicName() { + return "VirtualTopic.TEST"; + } + + protected String getVirtualTopicConsumerName() { + return "Consumer.A.VirtualTopic.TEST"; + } +}