diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index a6ba6fd56e..d2492820ea 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -107,7 +107,7 @@ public class AdvisoryBroker extends BrokerFilter { for (Iterator iter = producers.values().iterator(); iter.hasNext();) { ProducerInfo value = (ProducerInfo) iter.next(); ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination()); - fireAdvisory(context, topic, value, info.getConsumerId()); + fireProducerAdvisory(context, topic, value, info.getConsumerId()); } } @@ -177,8 +177,8 @@ public class AdvisoryBroker extends BrokerFilter { // Don't advise advisory topics. if( info.getDestination()!=null && !AdvisorySupport.isAdvisoryTopic(info.getDestination()) ) { ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()); - fireAdvisory(context, topic, info.createRemoveCommand()); producers.remove(info.getProducerId()); + fireProducerAdvisory(context, topic, info.createRemoveCommand()); } } @@ -199,6 +199,15 @@ public class AdvisoryBroker extends BrokerFilter { advisoryMessage.setIntProperty("consumerCount", consumers.size()); fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); } + + protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Throwable { + fireProducerAdvisory(context, topic, command, null); + } + protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Throwable { + ActiveMQMessage advisoryMessage = new ActiveMQMessage(); + advisoryMessage.setIntProperty("producerCount", producers.size()); + fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); + } protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Throwable { advisoryMessage.setDataStructure(command); diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEvent.java b/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEvent.java new file mode 100644 index 0000000000..7a11b0ce75 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEvent.java @@ -0,0 +1,64 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import org.apache.activemq.command.ProducerId; + +import javax.jms.Destination; + +import java.util.EventObject; + +/** + * An event when the number of producers on a given destination changes. + * + * @version $Revision: 359679 $ + */ +public abstract class ProducerEvent extends EventObject { + private static final long serialVersionUID = 2442156576867593780L; + private final Destination destination; + private final ProducerId producerId; + private final int producerCount; + + public ProducerEvent(ProducerEventSource source, Destination destination, ProducerId producerId, int producerCount) { + super(source); + this.destination = destination; + this.producerId = producerId; + this.producerCount = producerCount; + } + + public ProducerEventSource getAdvisor() { + return (ProducerEventSource) getSource(); + } + + public Destination getDestination() { + return destination; + } + + /** + * Returns the current number of producers active at the time this advisory was sent. + * + */ + public int getProducerCount() { + return producerCount; + } + + public ProducerId getProducerId() { + return producerId; + } + + public abstract boolean isStarted(); +} diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java b/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java new file mode 100644 index 0000000000..2491804709 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java @@ -0,0 +1,129 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import org.apache.activemq.Service; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveInfo; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; + +/** + * An object which can be used to listen to the number of active consumers + * available on a given destination. + * + * @version $Revision: 359679 $ + */ +public class ProducerEventSource implements Service, MessageListener { + private static final Log log = LogFactory.getLog(ProducerEventSource.class); + + private final Connection connection; + private final ActiveMQDestination destination; + private ProducerListener listener; + private AtomicBoolean started = new AtomicBoolean(false); + private AtomicInteger producerCount = new AtomicInteger(); + private Session session; + private MessageConsumer consumer; + + public ProducerEventSource(Connection connection, Destination destination) throws JMSException { + this.connection = connection; + this.destination = ActiveMQDestination.transform(destination); + } + + public void setProducerListener(ProducerListener listener) { + this.listener = listener; + } + + public void start() throws Exception { + if (started.compareAndSet(false, true)) { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQTopic advisoryTopic = AdvisorySupport.getProducerAdvisoryTopic(destination); + consumer = session.createConsumer(advisoryTopic); + consumer.setMessageListener(this); + } + } + + public void stop() throws Exception { + if (started.compareAndSet(true, false)) { + if (session != null) { + session.close(); + } + } + } + + public void onMessage(Message message) { + if (message instanceof ActiveMQMessage) { + ActiveMQMessage activeMessage = (ActiveMQMessage) message; + Object command = activeMessage.getDataStructure(); + int count = 0; + if (command instanceof ProducerInfo) { + count = producerCount.incrementAndGet(); + count = extractProducerCountFromMessage(message, count); + fireProducerEvent(new ProducerStartedEvent(this, destination, (ProducerInfo) command, count)); + } + else if (command instanceof RemoveInfo) { + RemoveInfo removeInfo = (RemoveInfo) command; + if (removeInfo.isProducerRemove()) { + count = producerCount.decrementAndGet(); + count = extractProducerCountFromMessage(message, count); + fireProducerEvent(new ProducerStoppedEvent(this, destination, (ProducerId) removeInfo.getObjectId(), count)); + } + } + else { + log.warn("Unknown command: " + command); + } + } + else { + log.warn("Unknown message type: " + message + ". Message ignored"); + } + } + + protected int extractProducerCountFromMessage(Message message, int count) { + try { + Object value = message.getObjectProperty("producerCount"); + if (value instanceof Number) { + Number n = (Number) value; + return n.intValue(); + } + log.warn("No producerCount header available on the message: " + message); + } + catch (Exception e) { + log.warn("Failed to extract producerCount from message: " + message + ".Reason: " + e, e); + } + return count; + } + + protected void fireProducerEvent(ProducerEvent event) { + if (listener != null) { + listener.onProducerEvent(event); + } + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerListener.java b/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerListener.java new file mode 100644 index 0000000000..12582c28f0 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerListener.java @@ -0,0 +1,27 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +/** + * Listen to the changes in the number of active consumers available for a given destination. + * + * @version $Revision: 359679 $ + */ +public interface ProducerListener { + + public void onProducerEvent(ProducerEvent event); +} diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerStartedEvent.java b/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerStartedEvent.java new file mode 100644 index 0000000000..1c6a080498 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerStartedEvent.java @@ -0,0 +1,48 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ProducerInfo; + +/** + * An event when a new consumer has started. + * + * @version $Revision: 359679 $ + */ +public class ProducerStartedEvent extends ProducerEvent { + + private static final long serialVersionUID = 5088138839609391074L; + + private final ProducerInfo consumerInfo; + + public ProducerStartedEvent(ProducerEventSource source, ActiveMQDestination destination, ProducerInfo consumerInfo, int count) { + super(source, destination, consumerInfo.getProducerId(), count); + this.consumerInfo = consumerInfo; + } + + public boolean isStarted() { + return true; + } + + /** + * @return details of the subscription + */ + public ProducerInfo getProducerInfo() { + return consumerInfo; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerStoppedEvent.java b/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerStoppedEvent.java new file mode 100644 index 0000000000..bfab1a6905 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerStoppedEvent.java @@ -0,0 +1,39 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ProducerId; + +/** + * An event generated when a consumer stops. + * + * @version $Revision: 359679 $ + */ +public class ProducerStoppedEvent extends ProducerEvent { + + private static final long serialVersionUID = 5378835541037193206L; + + public ProducerStoppedEvent(ProducerEventSource source, ActiveMQDestination destination, ProducerId consumerId, int count) { + super(source, destination, consumerId, count); + } + + public boolean isStarted() { + return false; + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java b/activemq-core/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java new file mode 100644 index 0000000000..6e677a685e --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java @@ -0,0 +1,134 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue; +import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue; +import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.advisory.ConsumerEvent; +import org.apache.activemq.advisory.ConsumerEventSource; +import org.apache.activemq.advisory.ConsumerListener; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * + * @version $Revision: 359679 $ + */ +public class ProducerListenerTest extends EmbeddedBrokerTestSupport implements ProducerListener { + + protected Session consumerSession1; + protected Session consumerSession2; + protected int consumerCounter; + protected ProducerEventSource producerEventSource; + protected BlockingQueue eventQueue = new ArrayBlockingQueue(1000); + private Connection connection; + + public void testProducerEvents() throws Exception { + producerEventSource.start(); + + consumerSession1 = createProducer(); + assertConsumerEvent(1, true); + + consumerSession2 = createProducer(); + assertConsumerEvent(2, true); + + consumerSession1.close(); + consumerSession1 = null; + assertConsumerEvent(1, false); + + consumerSession2.close(); + consumerSession2 = null; + assertConsumerEvent(0, false); + } + + public void testListenWhileAlreadyConsumersActive() throws Exception { + consumerSession1 = createProducer(); + consumerSession2 = createProducer(); + + producerEventSource.start(); + assertConsumerEvent(2, true); + assertConsumerEvent(2, true); + + consumerSession1.close(); + consumerSession1 = null; + assertConsumerEvent(1, false); + + consumerSession2.close(); + consumerSession2 = null; + assertConsumerEvent(0, false); + } + + public void onProducerEvent(ProducerEvent event) { + eventQueue.add(event); + } + + protected void setUp() throws Exception { + super.setUp(); + + connection = createConnection(); + connection.start(); + producerEventSource = new ProducerEventSource(connection, destination); + producerEventSource.setProducerListener(this); + } + + protected void tearDown() throws Exception { + if (producerEventSource != null) { + producerEventSource.stop(); + } + if (consumerSession2 != null) { + consumerSession2.close(); + } + if (consumerSession1 != null) { + consumerSession1.close(); + } + if (connection != null) { + connection.close(); + } + super.tearDown(); + } + + protected void assertConsumerEvent(int count, boolean started) throws InterruptedException { + ProducerEvent event = waitForProducerEvent(); + assertEquals("Producer count", count, event.getProducerCount()); + assertEquals("started", started, event.isStarted()); + } + + protected Session createProducer() throws JMSException { + final String consumerText = "Consumer: " + (++consumerCounter); + System.out.println("Creating consumer: " + consumerText + " on destination: " + destination); + + Session answer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = answer.createProducer(destination); + return answer; + } + + protected ProducerEvent waitForProducerEvent() throws InterruptedException { + ProducerEvent answer = (ProducerEvent) eventQueue.poll(100000, TimeUnit.MILLISECONDS); + assertTrue("Should have received a consumer event!", answer != null); + return answer; + } + +}