diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index ae05b2c934..c8f47675c7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -136,7 +136,8 @@ public class AdvisoryBroker extends BrokerFilter { } public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { - Destination answer = next.addDestination(context, destination); + Destination answer = next.addDestination(context, destination); + ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); fireAdvisory(context, topic, info); @@ -153,6 +154,21 @@ public class AdvisoryBroker extends BrokerFilter { fireAdvisory(context, topic, info); } } + + public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ + ActiveMQDestination destination = info.getDestination(); + next.addDestinationInfo(context, info); + + ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination); + fireAdvisory(context, topic, info); + destinations.put(destination, info); + } + + public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ + next.removeDestinationInfo(context, info); + ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(info.getDestination()); + fireAdvisory(context, topic, info); + } public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { next.removeConnection(context, info, error); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java index 779c3fe5dd..92cbcd2cf5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java @@ -139,24 +139,24 @@ public abstract class AbstractConnection implements Service, Connection, Task, C this.processDispatch(connector.getBrokerInfo()); } - public void stop() throws Exception { - if( disposed) + public void stop() throws Exception{ + if(disposed) return; - disposed=true; // // Remove all logical connection associated with this connection // from the broker. - ArrayList l = new ArrayList(connectionStates.keySet()); - for (Iterator iter = l.iterator(); iter.hasNext();) { - ConnectionId connectionId = (ConnectionId) iter.next(); - try { - processRemoveConnection(connectionId); - } catch (Throwable ignore) { + if(!broker.isStopped()){ + ArrayList l=new ArrayList(connectionStates.keySet()); + for(Iterator iter=l.iterator();iter.hasNext();){ + ConnectionId connectionId=(ConnectionId) iter.next(); + try{ + processRemoveConnection(connectionId); + }catch(Throwable ignore){} + } + if(brokerInfo!=null){ + broker.removeBroker(this,brokerInfo); } - } - if (brokerInfo != null){ - broker.removeBroker(this, brokerInfo); } } @@ -364,7 +364,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C public Response processAddDestination(DestinationInfo info) throws Exception { ConnectionState cs = lookupConnectionState(info.getConnectionId()); - broker.addDestination(cs.getContext(), info.getDestination()); + broker.addDestinationInfo(cs.getContext(), info); if( info.getDestination().isTemporary() ) { cs.addTempDestination(info.getDestination()); } @@ -373,7 +373,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C public Response processRemoveDestination(DestinationInfo info) throws Exception { ConnectionState cs = lookupConnectionState(info.getConnectionId()); - broker.removeDestination(cs.getContext(), info.getDestination(), info.getTimeout()); + broker.removeDestinationInfo(cs.getContext(), info); if( info.getDestination().isTemporary() ) { cs.removeTempDestination(info.getDestination()); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java index de539e99d3..211fa17c42 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java @@ -18,11 +18,13 @@ package org.apache.activemq.broker; import java.util.Set; import org.apache.activemq.Service; +import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Region; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.ProducerInfo; @@ -214,4 +216,22 @@ public interface Broker extends Region, Service { */ public Set getDurableDestinations(); + /** + * Add and process a DestinationInfo object + * @param context + * @param info + * @throws Exception + */ + public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception; + + + /** + * Remove and process a DestinationInfo object + * @param context + * @param info + * @throws Exception + */ + public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception; + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java index 460b7491c8..f87823f4ff 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -24,6 +24,7 @@ import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; @@ -193,5 +194,15 @@ public class BrokerFilter implements Broker { public Set getDurableDestinations(){ return next.getDurableDestinations(); } + + public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ + next.addDestinationInfo(context, info); + + } + + public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ + next.removeDestinationInfo(context, info); + + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java index c64e2f630f..7cf5ffda64 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -24,6 +24,7 @@ import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; @@ -192,4 +193,11 @@ public class EmptyBroker implements Broker{ return null; } + public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ + } + + public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ + } + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java index 36c6d9df56..21d3b707cc 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -24,6 +24,7 @@ import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; @@ -189,6 +190,16 @@ public class ErrorBroker implements Broker { public Set getDurableDestinations(){ throw new IllegalStateException(this.message); } + + public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ + throw new IllegalStateException(this.message); + + } + + public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ + throw new IllegalStateException(this.message); + + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index d881b6fe1e..d38b1ed58d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -24,6 +24,7 @@ import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; @@ -204,4 +205,14 @@ public class MutableBrokerFilter implements Broker { return getNext().getDurableDestinations(); } + public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ + getNext().addDestinationInfo(context, info); + + } + + public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ + getNext().removeDestinationInfo(context, info); + + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 1f5357dbde..b55a934937 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -29,6 +29,7 @@ import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; @@ -182,8 +183,10 @@ public class RegionBroker implements Broker { } public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { - if( destinations.contains(destination) ) + if( destinations.contains(destination) ){ + System.err.println(brokerService.getBrokerName() + " SPLATYTTTT!!!!"); throw new JMSException("Destination already exists: "+destination); + } Destination answer = null; switch(destination.getDestinationType()) { @@ -230,6 +233,16 @@ public class RegionBroker implements Broker { destinations.remove(destination); } + + public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ + addDestination(context,info.getDestination()); + + } + + public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{ + removeDestination(context,info.getDestination(), info.getTimeout()); + + } public ActiveMQDestination[] getDestinations() throws Exception { ArrayList l = new ArrayList(destinations); diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java b/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java index 73c8aa409d..34626bbac9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java @@ -71,6 +71,10 @@ abstract public class ActiveMQTempDestination extends ActiveMQDestination { public String getConnectionId() { return connectionId; } + + public void setConnectionId(String connectionId) { + this.connectionId = connectionId; + } public int getSequenceId() { return sequenceId; diff --git a/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java index f331381c83..9c5f50f4a1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java @@ -99,5 +99,9 @@ public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSuppo protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { return new NetworkBridgeFilter(getFromBrokerId(info), networkTTL); } + + protected BrokerId[] getRemoteBrokerPath(){ + return remoteBrokerPath; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java index 359b0aa68d..9ee98ef7df 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java @@ -60,7 +60,7 @@ public class DemandForwardingBridge extends DemandForwardingBridgeSupport { } protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) { - info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),remoteBrokerPath)); + info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),getRemoteBrokerPath())); } protected void serviceLocalBrokerInfo(Command command) throws InterruptedException { @@ -80,4 +80,8 @@ public class DemandForwardingBridge extends DemandForwardingBridgeSupport { protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException { return new NetworkBridgeFilter(remoteBrokerPath[0], networkTTL); } + + protected BrokerId[] getRemoteBrokerPath(){ + return remoteBrokerPath; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 78713df706..16c4032841 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -22,6 +22,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTempDestination; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -31,6 +32,7 @@ import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.DataStructure; +import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.Message; @@ -55,6 +57,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.io.IOException; +import javax.jms.TemporaryTopic; /** * A useful base class for implementing demand forwarding bridges. @@ -211,7 +214,13 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { +destinationFilter)); demandConsumerInfo.setPrefetchSize(prefetchSize); remoteBroker.oneway(demandConsumerInfo); - + + //we want infomation about Destinations as well + ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2); + destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC); + destinationInfo.setPrefetchSize(prefetchSize); + remoteBroker.oneway(destinationInfo); + startedLatch.countDown(); } } @@ -322,6 +331,32 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { if(log.isTraceEnabled()) log.trace("Ignoring sub " + info + " already subscribed to matching destination"); } + }else if (data.getClass()==DestinationInfo.class){ +// It's a destination info - we want to pass up + //infomation about temporary destinations + DestinationInfo destInfo = (DestinationInfo) data; + BrokerId[] path=destInfo.getBrokerPath(); + if((path!=null&&path.length>= networkTTL)){ + if(log.isTraceEnabled()) + log.trace("Ignoring Subscription " + destInfo + " restricted to " + networkTTL + " network hops only"); + return; + } + if(contains(destInfo.getBrokerPath(),localBrokerPath[0])){ + // Ignore this consumer as it's a consumer we locally sent to the broker. + if(log.isTraceEnabled()) + log.trace("Ignoring sub " + destInfo + " already routed through this broker once"); + return; + } + + destInfo.setConnectionId(localConnectionInfo.getConnectionId()); + if (destInfo.getDestination() instanceof ActiveMQTempDestination){ + //re-set connection id so comes from here + ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination(); + tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId()); + } + destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(),getRemoteBrokerPath())); + localBroker.oneway(destInfo); + } if(data.getClass()==RemoveInfo.class){ ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId(); @@ -339,7 +374,8 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { localBroker.oneway(sub.getLocalInfo()); } } - + + protected void removeSubscription(DemandSubscription sub) throws IOException { if(sub!=null){ subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); @@ -732,5 +768,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException; protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException; + + protected abstract BrokerId[] getRemoteBrokerPath(); } diff --git a/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java index ac1dfd92bf..86425873ee 100755 --- a/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java @@ -16,10 +16,16 @@ package org.apache.activemq.network; import java.net.URI; import javax.jms.Connection; import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.TopicRequestor; +import javax.jms.TopicSession; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; @@ -40,6 +46,37 @@ public class SimpleNetworkTest extends TestCase{ protected ActiveMQTopic included; protected ActiveMQTopic excluded; protected String consumerName="durableSubs"; + + + public void testRequestReply() throws Exception{ + final MessageProducer remoteProducer=remoteSession.createProducer(null); + MessageConsumer remoteConsumer=remoteSession.createConsumer(included); + remoteConsumer.setMessageListener(new MessageListener(){ + public void onMessage(Message msg){ + try{ + TextMessage textMsg=(TextMessage) msg; + String payload="REPLY: "+textMsg.getText(); + Destination replyTo; + replyTo=msg.getJMSReplyTo(); + textMsg.clearBody(); + textMsg.setText(payload); + remoteProducer.send(replyTo,textMsg); + }catch(JMSException e){ + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + }); + + TopicRequestor requestor=new TopicRequestor((TopicSession) localSession,included); + Thread.sleep(2000);//alow for consumer infos to perculate arround + for (int i =0;i < MESSAGE_COUNT; i++){ + TextMessage msg = localSession.createTextMessage("test msg: " +i); + TextMessage result = (TextMessage) requestor.request(msg); + assertNotNull(result); + System.out.println(result.getText()); + } + } public void testFiltering() throws Exception{ MessageConsumer includedConsumer=remoteSession.createConsumer(included); @@ -93,6 +130,8 @@ public class SimpleNetworkTest extends TestCase{ assertNotNull(remoteConsumer.receive(500)); } } + + protected void setUp() throws Exception{ super.setUp(); @@ -114,16 +153,19 @@ public class SimpleNetworkTest extends TestCase{ } protected void doSetUp() throws Exception{ - Resource resource=new ClassPathResource(getLocalBrokerURI()); + Resource resource=new ClassPathResource(getRemoteBrokerURI()); BrokerFactoryBean factory=new BrokerFactoryBean(resource); factory.afterPropertiesSet(); - localBroker=factory.getBroker(); - resource=new ClassPathResource(getRemoteBrokerURI()); + remoteBroker=factory.getBroker(); + remoteBroker.start(); + + resource=new ClassPathResource(getLocalBrokerURI()); factory=new BrokerFactoryBean(resource); factory.afterPropertiesSet(); - remoteBroker=factory.getBroker(); + localBroker=factory.getBroker(); + localBroker.start(); - remoteBroker.start(); + URI localURI=localBroker.getVmConnectorURI(); ActiveMQConnectionFactory fac=new ActiveMQConnectionFactory(localURI); localConnection=fac.createConnection(); diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml b/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml index 6d83643f4f..7647dad4a6 100644 --- a/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml @@ -23,7 +23,7 @@ - + dynamicOnly = false conduitSubscriptions = true decreaseNetworkConsumerPriority = false diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml b/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml index 329b085039..13d9b6b1b5 100644 --- a/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml @@ -21,6 +21,9 @@ + + +