git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@397613 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-04-27 19:11:38 +00:00
parent a17b95182f
commit cb1d21f0a3
5 changed files with 213 additions and 37 deletions

View File

@ -1555,7 +1555,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
info.setConnectionId(this.info.getConnectionId());
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
info.setDestination(destination);
info.setTimeout(1000*5);
info.setTimeout(0);
syncSendPacket(info);
}
@ -1590,7 +1590,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
info.setConnectionId(this.info.getConnectionId());
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
info.setDestination(destination);
info.setTimeout(1000*5);
info.setTimeout(0);
syncSendPacket(info);
}

View File

@ -147,20 +147,6 @@ public class AdvisoryBroker extends BrokerFilter {
return answer;
}
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
next.removeDestination(context, destination, timeout);
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
DestinationInfo info = (DestinationInfo) destinations.remove(destination);
if( info !=null && info.getDestination() != null && topic != null) {
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
fireAdvisory(context, topic, info);
next.removeDestination(context,topic,timeout);
next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), timeout);
next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), timeout);
}
}
public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
ActiveMQDestination destination = info.getDestination();
next.addDestinationInfo(context, info);
@ -170,18 +156,45 @@ public class AdvisoryBroker extends BrokerFilter {
destinations.put(destination, info);
}
public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
next.removeDestinationInfo(context, info);
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(info.getDestination());
fireAdvisory(context, topic, info);
try {
next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), 0);
} catch (Exception expectedIfDestinationDidNotExistYet) {
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
next.removeDestination(context, destination, timeout);
DestinationInfo info = (DestinationInfo) destinations.remove(destination);
if( info !=null ) {
info.setDestination(destination);
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
fireAdvisory(context, topic, info);
try {
next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1);
} catch (Exception expectedIfDestinationDidNotExistYet) {
}
try {
next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1);
} catch (Exception expectedIfDestinationDidNotExistYet) {
}
}
try {
next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), 0);
} catch (Exception expectedIfDestinationDidNotExistYet) {
}
public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception{
next.removeDestinationInfo(context, destInfo);
DestinationInfo info = (DestinationInfo) destinations.remove(destInfo.getDestination());
if( info !=null ) {
info.setDestination(destInfo.getDestination());
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination());
fireAdvisory(context, topic, info);
try {
next.removeDestination(context, AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()), -1);
} catch (Exception expectedIfDestinationDidNotExistYet) {
}
try {
next.removeDestination(context, AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()), -1);
} catch (Exception expectedIfDestinationDidNotExistYet) {
}
}
}
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {

View File

@ -17,12 +17,8 @@
package org.apache.activemq.broker.jmx;
import javax.management.ObjectName;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
public interface BrokerViewMBean extends Service {

View File

@ -23,7 +23,6 @@ import java.util.Set;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
@ -94,20 +93,41 @@ abstract public class AbstractRegion implements Region {
public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout)
throws Exception{
// The destination cannot be removed if there are any active subscriptions
for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
Subscription sub=(Subscription) iter.next();
if(sub.matches(destination)){
throw new JMSException("Destination still has an active subscription: "+destination);
// No timeout.. then try to shut down right way, fails if there are current subscribers.
if( timeout == 0 ) {
for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
Subscription sub=(Subscription) iter.next();
if(sub.matches(destination)){
throw new JMSException("Destination still has an active subscription: "+destination);
}
}
}
if( timeout > 0 ) {
// TODO: implement a way to notify the subscribers that we want to take the down
// the destination and that they should un-subscribe.. Then wait up to timeout time before
// dropping the subscription.
}
log.debug("Removing destination: "+destination);
synchronized(destinationsMutex){
Destination dest=(Destination) destinations.remove(destination);
if(dest!=null){
// timeout<0 or we timed out, we now force any remaining subscriptions to un-subscribe.
for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
Subscription sub=(Subscription) iter.next();
if(sub.matches(destination)){
dest.removeSubscription(context, sub);
}
}
destinationMap.removeAll(destination);
dest.dispose(context);
dest.stop();
}else{
log.debug("Destination doesn't exist: " + dest);
}

View File

@ -0,0 +1,147 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.advisory;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
/**
*
* @version $Revision: 397249 $
*/
public class TempDestDeleteTest extends EmbeddedBrokerTestSupport implements ConsumerListener {
protected int consumerCounter;
protected ConsumerEventSource topicConsumerEventSource;
private ConsumerEventSource queueConsumerEventSource;
protected BlockingQueue eventQueue = new ArrayBlockingQueue(1000);
private Connection connection;
private Session session;
private ActiveMQTempTopic tempTopic;
private ActiveMQTempQueue tempQueue;
public void testDeleteTempTopicDeletesAvisoryTopics() throws Exception {
topicConsumerEventSource.start();
MessageConsumer consumer = createConsumer(tempTopic);
assertConsumerEvent(1, true);
Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(tempTopic);
assertTrue( destinationExists(advisoryTopic) );
consumer.close();
// Once we delete the topic, the advisory topic for the destination should also be deleted.
tempTopic.delete();
assertFalse( destinationExists(advisoryTopic) );
}
public void testDeleteTempQueueDeletesAvisoryTopics() throws Exception {
queueConsumerEventSource.start();
MessageConsumer consumer = createConsumer(tempQueue);
assertConsumerEvent(1, true);
Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(tempQueue);
assertTrue( destinationExists(advisoryTopic) );
consumer.close();
// Once we delete the queue, the advisory topic for the destination should also be deleted.
tempQueue.delete();
assertFalse( destinationExists(advisoryTopic) );
}
private boolean destinationExists(Destination dest) throws Exception {
RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(RegionBroker.class);
return rb.getTopicRegion().getDestinationMap().containsKey(dest)
|| rb.getQueueRegion().getDestinationMap().containsKey(dest)
|| rb.getTempTopicRegion().getDestinationMap().containsKey(dest)
|| rb.getTempQueueRegion().getDestinationMap().containsKey(dest);
}
public void onConsumerEvent(ConsumerEvent event) {
eventQueue.add(event);
}
protected void setUp() throws Exception {
super.setUp();
connection = createConnection();
connection.start();
session = connection.createSession(false, 0);
tempTopic = (ActiveMQTempTopic) session.createTemporaryTopic();
topicConsumerEventSource = new ConsumerEventSource(connection, tempTopic);
topicConsumerEventSource.setConsumerListener(this);
tempQueue = (ActiveMQTempQueue) session.createTemporaryQueue();
queueConsumerEventSource = new ConsumerEventSource(connection, tempQueue);
queueConsumerEventSource.setConsumerListener(this);
}
protected void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
super.tearDown();
}
protected void assertConsumerEvent(int count, boolean started) throws InterruptedException {
ConsumerEvent event = waitForConsumerEvent();
assertEquals("Consumer count", count, event.getConsumerCount());
assertEquals("started", started, event.isStarted());
}
protected MessageConsumer createConsumer(Destination dest) throws JMSException {
final String consumerText = "Consumer: " + (++consumerCounter);
log.info("Creating consumer: " + consumerText + " on destination: " + dest);
MessageConsumer consumer = session.createConsumer(dest);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
log.info("Received message by: " + consumerText + " message: " + message);
}
});
return consumer;
}
protected ConsumerEvent waitForConsumerEvent() throws InterruptedException {
ConsumerEvent answer = (ConsumerEvent) eventQueue.poll(1000, TimeUnit.MILLISECONDS);
assertTrue("Should have received a consumer event!", answer != null);
return answer;
}
}