git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@388714 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-03-25 06:55:33 +00:00
parent f451ad04bf
commit a9c49a7f16
15 changed files with 210 additions and 25 deletions

View File

@ -137,6 +137,7 @@ public class AdvisoryBroker extends BrokerFilter {
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
Destination answer = next.addDestination(context, destination);
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
fireAdvisory(context, topic, info);
@ -154,6 +155,21 @@ public class AdvisoryBroker extends BrokerFilter {
}
}
public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
ActiveMQDestination destination = info.getDestination();
next.addDestinationInfo(context, info);
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
fireAdvisory(context, topic, info);
destinations.put(destination, info);
}
public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
next.removeDestinationInfo(context, info);
ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(info.getDestination());
fireAdvisory(context, topic, info);
}
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
next.removeConnection(context, info, error);

View File

@ -142,23 +142,23 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
public void stop() throws Exception{
if(disposed)
return;
disposed=true;
//
// Remove all logical connection associated with this connection
// from the broker.
if(!broker.isStopped()){
ArrayList l=new ArrayList(connectionStates.keySet());
for(Iterator iter=l.iterator();iter.hasNext();){
ConnectionId connectionId=(ConnectionId) iter.next();
try{
processRemoveConnection(connectionId);
} catch (Throwable ignore) {
}
}catch(Throwable ignore){}
}
if(brokerInfo!=null){
broker.removeBroker(this,brokerInfo);
}
}
}
public void serviceTransportException(IOException e) {
if( !disposed ) {
@ -364,7 +364,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
public Response processAddDestination(DestinationInfo info) throws Exception {
ConnectionState cs = lookupConnectionState(info.getConnectionId());
broker.addDestination(cs.getContext(), info.getDestination());
broker.addDestinationInfo(cs.getContext(), info);
if( info.getDestination().isTemporary() ) {
cs.addTempDestination(info.getDestination());
}
@ -373,7 +373,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
public Response processRemoveDestination(DestinationInfo info) throws Exception {
ConnectionState cs = lookupConnectionState(info.getConnectionId());
broker.removeDestination(cs.getContext(), info.getDestination(), info.getTimeout());
broker.removeDestinationInfo(cs.getContext(), info);
if( info.getDestination().isTemporary() ) {
cs.removeTempDestination(info.getDestination());
}

View File

@ -18,11 +18,13 @@ package org.apache.activemq.broker;
import java.util.Set;
import org.apache.activemq.Service;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
@ -214,4 +216,22 @@ public interface Broker extends Region, Service {
*/
public Set getDurableDestinations();
/**
* Add and process a DestinationInfo object
* @param context
* @param info
* @throws Exception
*/
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception;
/**
* Remove and process a DestinationInfo object
* @param context
* @param info
* @throws Exception
*/
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception;
}

View File

@ -24,6 +24,7 @@ import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
@ -194,4 +195,14 @@ public class BrokerFilter implements Broker {
return next.getDurableDestinations();
}
public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
next.addDestinationInfo(context, info);
}
public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
next.removeDestinationInfo(context, info);
}
}

View File

@ -24,6 +24,7 @@ import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
@ -192,4 +193,11 @@ public class EmptyBroker implements Broker{
return null;
}
public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
}
public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
}
}

View File

@ -24,6 +24,7 @@ import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
@ -190,5 +191,15 @@ public class ErrorBroker implements Broker {
throw new IllegalStateException(this.message);
}
public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
throw new IllegalStateException(this.message);
}
public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
throw new IllegalStateException(this.message);
}
}

View File

@ -24,6 +24,7 @@ import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
@ -204,4 +205,14 @@ public class MutableBrokerFilter implements Broker {
return getNext().getDurableDestinations();
}
public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
getNext().addDestinationInfo(context, info);
}
public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
getNext().removeDestinationInfo(context, info);
}
}

View File

@ -29,6 +29,7 @@ import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
@ -182,8 +183,10 @@ public class RegionBroker implements Broker {
}
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
if( destinations.contains(destination) )
if( destinations.contains(destination) ){
System.err.println(brokerService.getBrokerName() + " SPLATYTTTT!!!!");
throw new JMSException("Destination already exists: "+destination);
}
Destination answer = null;
switch(destination.getDestinationType()) {
@ -231,6 +234,16 @@ public class RegionBroker implements Broker {
destinations.remove(destination);
}
public void addDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
addDestination(context,info.getDestination());
}
public void removeDestinationInfo(ConnectionContext context,DestinationInfo info) throws Exception{
removeDestination(context,info.getDestination(), info.getTimeout());
}
public ActiveMQDestination[] getDestinations() throws Exception {
ArrayList l = new ArrayList(destinations);
ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];

View File

@ -72,6 +72,10 @@ abstract public class ActiveMQTempDestination extends ActiveMQDestination {
return connectionId;
}
public void setConnectionId(String connectionId) {
this.connectionId = connectionId;
}
public int getSequenceId() {
return sequenceId;
}

View File

@ -100,4 +100,8 @@ public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSuppo
return new NetworkBridgeFilter(getFromBrokerId(info), networkTTL);
}
protected BrokerId[] getRemoteBrokerPath(){
return remoteBrokerPath;
}
}

View File

@ -60,7 +60,7 @@ public class DemandForwardingBridge extends DemandForwardingBridgeSupport {
}
protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) {
info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),remoteBrokerPath));
info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),getRemoteBrokerPath()));
}
protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
@ -80,4 +80,8 @@ public class DemandForwardingBridge extends DemandForwardingBridgeSupport {
protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
return new NetworkBridgeFilter(remoteBrokerPath[0], networkTTL);
}
protected BrokerId[] getRemoteBrokerPath(){
return remoteBrokerPath;
}
}

View File

@ -22,6 +22,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@ -31,6 +32,7 @@ import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
@ -55,6 +57,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import javax.jms.TemporaryTopic;
/**
* A useful base class for implementing demand forwarding bridges.
@ -212,6 +215,12 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
demandConsumerInfo.setPrefetchSize(prefetchSize);
remoteBroker.oneway(demandConsumerInfo);
//we want infomation about Destinations as well
ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2);
destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
destinationInfo.setPrefetchSize(prefetchSize);
remoteBroker.oneway(destinationInfo);
startedLatch.countDown();
}
}
@ -322,6 +331,32 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
if(log.isTraceEnabled())
log.trace("Ignoring sub " + info + " already subscribed to matching destination");
}
}else if (data.getClass()==DestinationInfo.class){
// It's a destination info - we want to pass up
//infomation about temporary destinations
DestinationInfo destInfo = (DestinationInfo) data;
BrokerId[] path=destInfo.getBrokerPath();
if((path!=null&&path.length>= networkTTL)){
if(log.isTraceEnabled())
log.trace("Ignoring Subscription " + destInfo + " restricted to " + networkTTL + " network hops only");
return;
}
if(contains(destInfo.getBrokerPath(),localBrokerPath[0])){
// Ignore this consumer as it's a consumer we locally sent to the broker.
if(log.isTraceEnabled())
log.trace("Ignoring sub " + destInfo + " already routed through this broker once");
return;
}
destInfo.setConnectionId(localConnectionInfo.getConnectionId());
if (destInfo.getDestination() instanceof ActiveMQTempDestination){
//re-set connection id so comes from here
ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
}
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(),getRemoteBrokerPath()));
localBroker.oneway(destInfo);
}
if(data.getClass()==RemoveInfo.class){
ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId();
@ -340,6 +375,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
}
}
protected void removeSubscription(DemandSubscription sub) throws IOException {
if(sub!=null){
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
@ -733,4 +769,6 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException;
protected abstract BrokerId[] getRemoteBrokerPath();
}

View File

@ -16,10 +16,16 @@ package org.apache.activemq.network;
import java.net.URI;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicRequestor;
import javax.jms.TopicSession;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
@ -41,6 +47,37 @@ public class SimpleNetworkTest extends TestCase{
protected ActiveMQTopic excluded;
protected String consumerName="durableSubs";
public void testRequestReply() throws Exception{
final MessageProducer remoteProducer=remoteSession.createProducer(null);
MessageConsumer remoteConsumer=remoteSession.createConsumer(included);
remoteConsumer.setMessageListener(new MessageListener(){
public void onMessage(Message msg){
try{
TextMessage textMsg=(TextMessage) msg;
String payload="REPLY: "+textMsg.getText();
Destination replyTo;
replyTo=msg.getJMSReplyTo();
textMsg.clearBody();
textMsg.setText(payload);
remoteProducer.send(replyTo,textMsg);
}catch(JMSException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
TopicRequestor requestor=new TopicRequestor((TopicSession) localSession,included);
Thread.sleep(2000);//alow for consumer infos to perculate arround
for (int i =0;i < MESSAGE_COUNT; i++){
TextMessage msg = localSession.createTextMessage("test msg: " +i);
TextMessage result = (TextMessage) requestor.request(msg);
assertNotNull(result);
System.out.println(result.getText());
}
}
public void testFiltering() throws Exception{
MessageConsumer includedConsumer=remoteSession.createConsumer(included);
MessageConsumer excludedConsumer=remoteSession.createConsumer(excluded);
@ -94,6 +131,8 @@ public class SimpleNetworkTest extends TestCase{
}
}
protected void setUp() throws Exception{
super.setUp();
doSetUp();
@ -114,16 +153,19 @@ public class SimpleNetworkTest extends TestCase{
}
protected void doSetUp() throws Exception{
Resource resource=new ClassPathResource(getLocalBrokerURI());
Resource resource=new ClassPathResource(getRemoteBrokerURI());
BrokerFactoryBean factory=new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
localBroker=factory.getBroker();
resource=new ClassPathResource(getRemoteBrokerURI());
remoteBroker=factory.getBroker();
remoteBroker.start();
resource=new ClassPathResource(getLocalBrokerURI());
factory=new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
remoteBroker=factory.getBroker();
localBroker=factory.getBroker();
localBroker.start();
remoteBroker.start();
URI localURI=localBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac=new ActiveMQConnectionFactory(localURI);
localConnection=fac.createConnection();

View File

@ -23,7 +23,7 @@
</transportConnectors>
<networkConnectors>
<networkConnector uri="static://(tcp://localhost:61617)">
<networkConnector uri="static:failover:(tcp://localhost:61617)">
dynamicOnly = false
conduitSubscriptions = true
decreaseNetworkConsumerPriority = false

View File

@ -21,6 +21,9 @@
<transportConnectors>
<transportConnector uri="tcp://localhost:61617"/>
</transportConnectors>
<networkConnectors>
<networkConnector uri="static:failover:(tcp://localhost:61616)"/>
</networkConnectors>
</broker>
</beans>