diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 3f763e45ae..50cd32411d 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -267,11 +267,7 @@ public abstract class AbstractRegion implements Region { for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) { Subscription sub = iter.next(); if (sub.matches(destination) ) { - // may be a new sub created after gc decision, verify if really subscribed - Destination toDelete = destinations.get(destination); - if (toDelete != null && toDelete.getDestinationStatistics().getConsumers().getCount() > 0 ) { - throw new JMSException("Destination still has an active subscription: " + destination); - } + throw new JMSException("Destination still has an active subscription: " + destination); } } } @@ -394,6 +390,8 @@ public abstract class AbstractRegion implements Region { for (Destination dest : (Set) destinationMap.get(info.getDestination())) { addList.add(dest); } + // ensure sub visible to any new dest addSubscriptionsForDestination + subscriptions.put(info.getConsumerId(), sub); } finally { destinationsLock.readLock().unlock(); } @@ -416,6 +414,8 @@ public abstract class AbstractRegion implements Region { LOG.error("Error unsubscribing " + sub + " from " + remove + ": " + ex.getMessage(), ex); } } + subscriptions.remove(info.getConsumerId()); + removeList.clear(); throw e; } } @@ -426,8 +426,6 @@ public abstract class AbstractRegion implements Region { ((QueueBrowserSubscription) sub).destinationsAdded(); } - subscriptions.put(info.getConsumerId(), sub); - return sub; } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index ae03d5704f..fa5ae497ac 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -263,6 +263,7 @@ public abstract class BaseDestination implements Destination { @Override public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{ destinationStatistics.getConsumers().decrement(); + this.lastActiveTime=0l; } @@ -298,9 +299,9 @@ public abstract class BaseDestination implements Destination { @Override public boolean isActive() { - boolean isActive = destinationStatistics.getConsumers().getCount() != 0 || - destinationStatistics.getProducers().getCount() != 0; - if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() != 0) { + boolean isActive = destinationStatistics.getConsumers().getCount() > 0 || + destinationStatistics.getProducers().getCount() > 0; + if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() > 0) { isActive = hasRegularConsumers(getConsumers()); } return isActive; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 7707bf5992..c553e8c113 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -192,9 +192,12 @@ public class Topic extends BaseDestination implements Task { @Override public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { if (!sub.getConsumerInfo().isDurable()) { - super.removeSubscription(context, sub, lastDeliveredSequenceId); + boolean removed = false; synchronized (consumers) { - consumers.remove(sub); + removed = consumers.remove(sub); + } + if (removed) { + super.removeSubscription(context, sub, lastDeliveredSequenceId); } } sub.remove(context, this); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java index 8f7b1236ab..c6f8409c62 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCStressTest.java @@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; public class DestinationGCStressTest { @@ -168,7 +169,7 @@ public class DestinationGCStressTest { log4jLogger.addAppender(appender); try { - final AtomicInteger max = new AtomicInteger(10000); + final AtomicInteger max = new AtomicInteger(20000); final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false"); factory.setWatchTopicAdvisories(false); @@ -201,13 +202,12 @@ public class DestinationGCStressTest { executorService.submit(new Runnable() { @Override public void run() { - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 1000; i++) { try { MessageConsumer messageConsumer = session.createConsumer(new ActiveMQTopic(">")); messageConsumer.close(); } catch (Exception ignored) { - ignored.printStackTrace(); } } } @@ -226,4 +226,75 @@ public class DestinationGCStressTest { assertFalse("failed on unexpected log event", failed.get()); } + + @Test(timeout = 60000) + public void testAllDestsSeeSub() throws Exception { + + final AtomicInteger foundDestWithMissingSub = new AtomicInteger(0); + + final AtomicInteger max = new AtomicInteger(20000); + + final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false"); + factory.setWatchTopicAdvisories(false); + Connection connection = factory.createConnection(); + connection.start(); + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ExecutorService executorService = Executors.newCachedThreadPool(); + for (int i = 0; i < 1; i++) { + executorService.submit(new Runnable() { + @Override + public void run() { + try { + Connection c = factory.createConnection(); + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = s.createProducer(null); + Message message = s.createTextMessage(); + int j; + while ((j = max.decrementAndGet()) > 0) { + producer.send(new ActiveMQTopic("A." + j), message); + } + } catch (Exception ignored) { + ignored.printStackTrace(); + } + } + }); + } + + executorService.submit(new Runnable() { + @Override + public void run() { + for (int i = 0; i < 1000; i++) { + try { + MessageConsumer messageConsumer = session.createConsumer(new ActiveMQTopic(">")); + if (destMissingSub(foundDestWithMissingSub)) { + break; + } + messageConsumer.close(); + + } catch (Exception ignored) { + } + } + } + }); + + executorService.shutdown(); + executorService.awaitTermination(60, TimeUnit.SECONDS); + connection.close(); + + assertEquals("no dests missing sub", 0, foundDestWithMissingSub.get()); + + } + + private boolean destMissingSub(AtomicInteger tally) { + for (Destination destination : + ((RegionBroker)brokerService.getRegionBroker()).getTopicRegion().getDestinationMap().values()) { + if (destination.getConsumers().isEmpty()) { + tally.incrementAndGet(); + return true; + } + } + return false; + } }