From 687bf43d9fc90263dd778350c0eaf1d7e6c9b81f Mon Sep 17 00:00:00 2001 From: James Strachan Date: Tue, 13 Dec 2005 16:21:32 +0000 Subject: [PATCH] added a ConsumerListener so that you can listen to consumers coming and going easily (hiding the details of the Advisories behind a simple Bean API) as well as be notified on exactly how many consumers there are to be able to drive demand based publishing etc git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@356524 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/activemq/advisory/AdvisoryBroker.java | 24 ++- .../org/activemq/advisory/ConsumerEvent.java | 70 +++++++++ .../advisory/ConsumerEventSource.java | 137 ++++++++++++++++++ .../activemq/advisory/ConsumerListener.java | 28 ++++ .../advisory/ConsumerStartedEvent.java | 49 +++++++ .../advisory/ConsumerStoppedEvent.java | 40 +++++ .../java/org/activemq/advisory/package.html | 2 +- .../advisory/ConsumerListenerTest.java | 136 +++++++++++++++++ 8 files changed, 477 insertions(+), 9 deletions(-) create mode 100644 activemq-core/src/main/java/org/activemq/advisory/ConsumerEvent.java create mode 100644 activemq-core/src/main/java/org/activemq/advisory/ConsumerEventSource.java create mode 100644 activemq-core/src/main/java/org/activemq/advisory/ConsumerListener.java create mode 100644 activemq-core/src/main/java/org/activemq/advisory/ConsumerStartedEvent.java create mode 100644 activemq-core/src/main/java/org/activemq/advisory/ConsumerStoppedEvent.java create mode 100644 activemq-core/src/test/java/org/activemq/advisory/ConsumerListenerTest.java diff --git a/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java index e73fae7447..11eec08145 100755 --- a/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java @@ -74,12 +74,12 @@ public class AdvisoryBroker extends BrokerFilter { public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Throwable { next.addConsumer(context, info); - + // Don't advise advisory topics. if( !AdvisorySupport.isAdvisoryTopic(info.getDestination()) ) { ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); - fireAdvisory(context, topic, info); consumers.put(info.getConsumerId(), info); + fireConsumerAdvisory(context, topic, info); } else { // We need to replay all the previously collected state objects @@ -118,7 +118,7 @@ public class AdvisoryBroker extends BrokerFilter { for (Iterator iter = consumers.values().iterator(); iter.hasNext();) { ConsumerInfo value = (ConsumerInfo) iter.next(); ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination()); - fireAdvisory(context, topic, value, info.getConsumerId()); + fireConsumerAdvisory(context, topic, value); } } } @@ -168,8 +168,8 @@ public class AdvisoryBroker extends BrokerFilter { // Don't advise advisory topics. if( !AdvisorySupport.isAdvisoryTopic(info.getDestination()) ) { ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); - fireAdvisory(context, topic, info.createRemoveCommand()); consumers.remove(info.getConsumerId()); + fireConsumerAdvisory(context, topic, info.createRemoveCommand()); } } @@ -184,13 +184,22 @@ public class AdvisoryBroker extends BrokerFilter { } } - private void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Throwable { + protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Throwable { fireAdvisory(context, topic, command, null); } - private void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Throwable { - + protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Throwable { ActiveMQMessage advisoryMessage = new ActiveMQMessage(); + fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); + } + + protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Throwable { + ActiveMQMessage advisoryMessage = new ActiveMQMessage(); + advisoryMessage.setIntProperty("consumerCount", consumers.size()); + fireAdvisory(context, topic, command, null, advisoryMessage); + } + + protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Throwable { advisoryMessage.setDataStructure(command); advisoryMessage.setPersistent(false); advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); @@ -208,7 +217,6 @@ public class AdvisoryBroker extends BrokerFilter { } finally { context.setProducerFlowControl(originalFlowControl); } - } } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/activemq/advisory/ConsumerEvent.java b/activemq-core/src/main/java/org/activemq/advisory/ConsumerEvent.java new file mode 100644 index 0000000000..73b5931504 --- /dev/null +++ b/activemq-core/src/main/java/org/activemq/advisory/ConsumerEvent.java @@ -0,0 +1,70 @@ +/** + * + * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.activemq.advisory; + +import org.activemq.command.ConsumerId; + +import javax.jms.Destination; + +import java.util.EventObject; + +/** + * An event when the number of consumers on a given destination changes. + * + * @version $Revision$ + */ +public abstract class ConsumerEvent extends EventObject { + private static final long serialVersionUID = 2442156576867593780L; + private final Destination destination; + private final ConsumerId consumerId; + private final int consumerCount; + + public ConsumerEvent(ConsumerEventSource source, Destination destination, ConsumerId consumerId, int consumerCount) { + super(source); + this.destination = destination; + this.consumerId = consumerId; + this.consumerCount = consumerCount; + } + + public ConsumerEventSource getAdvisor() { + return (ConsumerEventSource) getSource(); + } + + public Destination getDestination() { + return destination; + } + + /** + * Returns the current number of consumers active at the time this advisory was sent. + * + * Note that this is not the number of consumers active when the consumer started consuming. + * It is usually more vital to know how many consumers there are now - rather than historically + * how many there were when a consumer started. So if you create a {@link ConsumerListener} + * after many consumers have started, you will receive a ConsumerEvent for each consumer. However the + * {@link #getConsumerCount()} method will always return the current active consumer count on each event. + */ + public int getConsumerCount() { + return consumerCount; + } + + public ConsumerId getConsumerId() { + return consumerId; + } + + public abstract boolean isStarted(); +} diff --git a/activemq-core/src/main/java/org/activemq/advisory/ConsumerEventSource.java b/activemq-core/src/main/java/org/activemq/advisory/ConsumerEventSource.java new file mode 100644 index 0000000000..99e60ed7c3 --- /dev/null +++ b/activemq-core/src/main/java/org/activemq/advisory/ConsumerEventSource.java @@ -0,0 +1,137 @@ +/** + * + * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.activemq.advisory; + +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; + +import org.activemq.Service; +import org.activemq.command.ActiveMQDestination; +import org.activemq.command.ActiveMQMessage; +import org.activemq.command.ActiveMQTopic; +import org.activemq.command.ConsumerId; +import org.activemq.command.ConsumerInfo; +import org.activemq.command.RemoveInfo; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +/** + * An object which can be used to listen to the number of active consumers + * available on a given destination. + * + * @version $Revision$ + */ +public class ConsumerEventSource implements Service, MessageListener { + private static final Log log = LogFactory.getLog(ConsumerEventSource.class); + + private final Connection connection; + private final ActiveMQDestination destination; + private ConsumerListener listener; + private AtomicBoolean started = new AtomicBoolean(false); + private AtomicInteger consumerCount = new AtomicInteger(); + private Session session; + private MessageConsumer consumer; + + public ConsumerEventSource(Connection connection, Destination destination) throws JMSException { + this.connection = connection; + this.destination = ActiveMQDestination.transform(destination); + } + + public void setConsumerListener(ConsumerListener listener) { + this.listener = listener; + } + + public void start() throws Exception { + if (started.compareAndSet(false, true)) { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQTopic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(destination); + consumer = session.createConsumer(advisoryTopic); + consumer.setMessageListener(this); + } + } + + public void stop() throws Exception { + if (started.compareAndSet(true, false)) { + if (session != null) { + session.close(); + } + } + } + + public void onMessage(Message message) { + if (message instanceof ActiveMQMessage) { + ActiveMQMessage activeMessage = (ActiveMQMessage) message; + Object command = activeMessage.getDataStructure(); + int count = 0; + if (command instanceof ConsumerInfo) { + count = consumerCount.incrementAndGet(); + count = extractConsumerCountFromMessage(message, count); + fireConsumerEvent(new ConsumerStartedEvent(this, destination, (ConsumerInfo) command, count)); + } + else if (command instanceof RemoveInfo) { + RemoveInfo removeInfo = (RemoveInfo) command; + if (removeInfo.isConsumerRemove()) { + count = consumerCount.decrementAndGet(); + count = extractConsumerCountFromMessage(message, count); + fireConsumerEvent(new ConsumerStoppedEvent(this, destination, (ConsumerId) removeInfo.getObjectId(), count)); + } + } + else { + log.warn("Unknown command: " + command); + } + } + else { + log.warn("Unknown message type: " + message + ". Message ignored"); + } + } + + /** + * Lets rely by default on the broker telling us what the consumer count is + * as it can ensure that we are up to date at all times and have not + * received messages out of order etc. + */ + protected int extractConsumerCountFromMessage(Message message, int count) { + try { + Object value = message.getObjectProperty("consumerCount"); + if (value instanceof Number) { + Number n = (Number) value; + return n.intValue(); + } + log.warn("No consumerCount header available on the message: " + message); + } + catch (Exception e) { + log.warn("Failed to extract consumerCount from message: " + message + ".Reason: " + e, e); + } + return count; + } + + protected void fireConsumerEvent(ConsumerEvent event) { + if (listener != null) { + listener.onConsumerEvent(event); + } + } + +} diff --git a/activemq-core/src/main/java/org/activemq/advisory/ConsumerListener.java b/activemq-core/src/main/java/org/activemq/advisory/ConsumerListener.java new file mode 100644 index 0000000000..9e3484cab9 --- /dev/null +++ b/activemq-core/src/main/java/org/activemq/advisory/ConsumerListener.java @@ -0,0 +1,28 @@ +/** + * + * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.activemq.advisory; + +/** + * Listen to the changes in the number of active consumers available for a given destination. + * + * @version $Revision$ + */ +public interface ConsumerListener { + + public void onConsumerEvent(ConsumerEvent event); +} diff --git a/activemq-core/src/main/java/org/activemq/advisory/ConsumerStartedEvent.java b/activemq-core/src/main/java/org/activemq/advisory/ConsumerStartedEvent.java new file mode 100644 index 0000000000..5be09de7f6 --- /dev/null +++ b/activemq-core/src/main/java/org/activemq/advisory/ConsumerStartedEvent.java @@ -0,0 +1,49 @@ +/** + * + * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.activemq.advisory; + +import org.activemq.command.ActiveMQDestination; +import org.activemq.command.ConsumerInfo; + +/** + * An event when a new consumer has started. + * + * @version $Revision$ + */ +public class ConsumerStartedEvent extends ConsumerEvent { + + private static final long serialVersionUID = 5088138839609391074L; + + private final ConsumerInfo consumerInfo; + + public ConsumerStartedEvent(ConsumerEventSource source, ActiveMQDestination destination, ConsumerInfo consumerInfo, int count) { + super(source, destination, consumerInfo.getConsumerId(), count); + this.consumerInfo = consumerInfo; + } + + public boolean isStarted() { + return true; + } + + /** + * @return details of the subscription + */ + public ConsumerInfo getConsumerInfo() { + return consumerInfo; + } +} diff --git a/activemq-core/src/main/java/org/activemq/advisory/ConsumerStoppedEvent.java b/activemq-core/src/main/java/org/activemq/advisory/ConsumerStoppedEvent.java new file mode 100644 index 0000000000..f7586259c2 --- /dev/null +++ b/activemq-core/src/main/java/org/activemq/advisory/ConsumerStoppedEvent.java @@ -0,0 +1,40 @@ +/** + * + * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.activemq.advisory; + +import org.activemq.command.ActiveMQDestination; +import org.activemq.command.ConsumerId; + +/** + * An event generated when a consumer stops. + * + * @version $Revision$ + */ +public class ConsumerStoppedEvent extends ConsumerEvent { + + private static final long serialVersionUID = 5378835541037193206L; + + public ConsumerStoppedEvent(ConsumerEventSource source, ActiveMQDestination destination, ConsumerId consumerId, int count) { + super(source, destination, consumerId, count); + } + + public boolean isStarted() { + return false; + } + +} diff --git a/activemq-core/src/main/java/org/activemq/advisory/package.html b/activemq-core/src/main/java/org/activemq/advisory/package.html index 10586a4113..b2ebc435f2 100755 --- a/activemq-core/src/main/java/org/activemq/advisory/package.html +++ b/activemq-core/src/main/java/org/activemq/advisory/package.html @@ -3,7 +3,7 @@ -Support for JMS Advisory messages +Support for JMS Advisory messages as well as some helper listeners to listen to the clients, producers and consumers available. diff --git a/activemq-core/src/test/java/org/activemq/advisory/ConsumerListenerTest.java b/activemq-core/src/test/java/org/activemq/advisory/ConsumerListenerTest.java new file mode 100644 index 0000000000..38b121417e --- /dev/null +++ b/activemq-core/src/test/java/org/activemq/advisory/ConsumerListenerTest.java @@ -0,0 +1,136 @@ +/** + * + * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.activemq.advisory; + +import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue; +import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue; +import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; + +import org.activemq.EmbeddedBrokerTestSupport; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +/** + * + * @version $Revision$ + */ +public class ConsumerListenerTest extends EmbeddedBrokerTestSupport implements ConsumerListener { + + protected Session consumerSession1; + protected Session consumerSession2; + protected int consumerCounter; + protected ConsumerEventSource consumerEventSource; + protected BlockingQueue eventQueue = new ArrayBlockingQueue(1000); + private Connection connection; + + public void testConsumerEvents() throws Exception { + consumerEventSource.start(); + + consumerSession1 = createConsumer(); + assertConsumerEvent(1, true); + + consumerSession2 = createConsumer(); + assertConsumerEvent(2, true); + + consumerSession1.close(); + consumerSession1 = null; + assertConsumerEvent(1, false); + + consumerSession2.close(); + consumerSession2 = null; + assertConsumerEvent(0, false); + } + + public void testListenWhileAlreadyConsumersActive() throws Exception { + consumerSession1 = createConsumer(); + consumerSession2 = createConsumer(); + + consumerEventSource.start(); + assertConsumerEvent(2, true); + assertConsumerEvent(2, true); + + consumerSession1.close(); + consumerSession1 = null; + assertConsumerEvent(1, false); + + consumerSession2.close(); + consumerSession2 = null; + assertConsumerEvent(0, false); + } + + public void onConsumerEvent(ConsumerEvent event) { + eventQueue.add(event); + } + + protected void setUp() throws Exception { + super.setUp(); + + connection = createConnection(); + connection.start(); + consumerEventSource = new ConsumerEventSource(connection, destination); + consumerEventSource.setConsumerListener(this); + } + + protected void tearDown() throws Exception { + if (consumerEventSource != null) { + consumerEventSource.stop(); + } + if (consumerSession2 != null) { + consumerSession2.close(); + } + if (consumerSession1 != null) { + consumerSession1.close(); + } + if (connection != null) { + connection.close(); + } + super.tearDown(); + } + + protected void assertConsumerEvent(int count, boolean started) throws InterruptedException { + ConsumerEvent event = waitForConsumerEvent(); + assertEquals("Consumer count", count, event.getConsumerCount()); + assertEquals("started", started, event.isStarted()); + } + + protected Session createConsumer() throws JMSException { + final String consumerText = "Consumer: " + (++consumerCounter); + System.out.println("Creating consumer: " + consumerText + " on destination: " + destination); + + Session answer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = answer.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + System.out.println("Received message by: " + consumerText + " message: " + message); + } + }); + return answer; + } + + protected ConsumerEvent waitForConsumerEvent() throws InterruptedException { + ConsumerEvent answer = (ConsumerEvent) eventQueue.poll(100000, TimeUnit.MILLISECONDS); + assertTrue("Should have received a consumer event!", answer != null); + return answer; + } + +}