diff --git a/activemq-core/src/main/java/org/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/activemq/ActiveMQConnectionFactory.java index 0239d11cca..d793ecc8a3 100755 --- a/activemq-core/src/main/java/org/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/activemq/ActiveMQConnectionFactory.java @@ -66,8 +66,6 @@ public class ActiveMQConnectionFactory implements ConnectionFactory, QueueConnec protected String password; protected String clientID; - protected boolean useEmbeddedBroker; - // optimization flags private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); private boolean disableTimeStampsByDefault = false; @@ -317,14 +315,6 @@ public class ActiveMQConnectionFactory implements ConnectionFactory, QueueConnec this.useAsyncSend = useAsyncSend; } - public boolean isUseEmbeddedBroker() { - return useEmbeddedBroker; - } - - public void setUseEmbeddedBroker(boolean useEmbeddedBroker) { - this.useEmbeddedBroker = useEmbeddedBroker; - } - public String getUserName() { return userName; } diff --git a/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java index 11eec08145..b7e29353a5 100755 --- a/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java @@ -118,7 +118,7 @@ public class AdvisoryBroker extends BrokerFilter { for (Iterator iter = consumers.values().iterator(); iter.hasNext();) { ConsumerInfo value = (ConsumerInfo) iter.next(); ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination()); - fireConsumerAdvisory(context, topic, value); + fireConsumerAdvisory(context, topic, value, info.getConsumerId()); } } } @@ -194,9 +194,12 @@ public class AdvisoryBroker extends BrokerFilter { } protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Throwable { + fireConsumerAdvisory(context, topic, command, null); + } + protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Throwable { ActiveMQMessage advisoryMessage = new ActiveMQMessage(); advisoryMessage.setIntProperty("consumerCount", consumers.size()); - fireAdvisory(context, topic, command, null, advisoryMessage); + fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); } protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Throwable { diff --git a/activemq-core/src/main/java/org/activemq/broker/AbstractConnection.java b/activemq-core/src/main/java/org/activemq/broker/AbstractConnection.java index b80bbafef4..5c35b8a318 100755 --- a/activemq-core/src/main/java/org/activemq/broker/AbstractConnection.java +++ b/activemq-core/src/main/java/org/activemq/broker/AbstractConnection.java @@ -78,7 +78,6 @@ public abstract class AbstractConnection implements Service, Connection, Task, C protected final List dispatchQueue = Collections.synchronizedList(new LinkedList()); protected final TaskRunner taskRunner; protected final Connector connector; - protected boolean demandForwardingBridge; private ConnectionStatistics statistics = new ConnectionStatistics(); protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap(); @@ -326,7 +325,6 @@ public abstract class AbstractConnection implements Service, Connection, Task, C public Response processMessage(Message messageSend) throws Throwable { - messageSend.setRecievedByDFBridge(demandForwardingBridge); broker.send(lookupConnectionState(messageSend.getProducerId()).getContext(), messageSend); return null; } @@ -337,7 +335,6 @@ public abstract class AbstractConnection implements Service, Connection, Task, C } public Response processBrokerInfo(BrokerInfo info) { - demandForwardingBridge = true; return null; } diff --git a/activemq-core/src/main/java/org/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/activemq/broker/TransportConnection.java index dec513649b..bad46fe291 100755 --- a/activemq-core/src/main/java/org/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/activemq/broker/TransportConnection.java @@ -21,10 +21,6 @@ package org.activemq.broker; import java.io.IOException; import org.activemq.command.Command; -import org.activemq.command.CommandTypes; -import org.activemq.command.ConsumerInfo; -import org.activemq.command.Message; -import org.activemq.command.MessageDispatch; import org.activemq.command.Response; import org.activemq.thread.TaskRunnerFactory; import org.activemq.transport.Transport; @@ -183,34 +179,15 @@ public class TransportConnection extends AbstractConnection { protected void dispatch(Command command){ - if(isValidForNetwork(command)){ - try{ - setMarkedCandidate(true); - transport.oneway(command); - getStatistics().onCommand(command); - }catch(IOException e){ - serviceException(e); - }finally{ - setMarkedCandidate(false); - } + try{ + setMarkedCandidate(true); + transport.oneway(command); + getStatistics().onCommand(command); + }catch(IOException e){ + serviceException(e); + }finally{ + setMarkedCandidate(false); } - } - - protected boolean isValidForNetwork(Command command){ - boolean result=true; - if(demandForwardingBridge&&command.isMessageDispatch()){ - MessageDispatch md=(MessageDispatch) command; - Message message=md.getMessage(); - if(message.isAdvisory()&&message.getDataStructure()!=null - &&message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){ - ConsumerInfo info=(ConsumerInfo) message.getDataStructure(); - if(info.isNetworkSubscription()){ - // don't want to forward these - result=false; - } - } - } - return result; - } + } } diff --git a/activemq-core/src/main/java/org/activemq/command/BaseCommand.java b/activemq-core/src/main/java/org/activemq/command/BaseCommand.java index 3048428e8c..ab3bec3c6c 100755 --- a/activemq-core/src/main/java/org/activemq/command/BaseCommand.java +++ b/activemq-core/src/main/java/org/activemq/command/BaseCommand.java @@ -20,6 +20,7 @@ package org.activemq.command; import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.util.Arrays; import java.util.LinkedHashMap; /** @@ -118,7 +119,14 @@ abstract public class BaseCommand implements Command { } try { - map.put(field.getName(), field.get(this)); + Object o = field.get(this); + if( o!=null && o.getClass().isArray() ) { + try { + o = Arrays.asList((Object[]) o); + } catch (Throwable e) { + } + } + map.put(field.getName(), o); } catch (Throwable e) { e.printStackTrace(); } diff --git a/activemq-core/src/main/java/org/activemq/command/ConsumerInfo.java b/activemq-core/src/main/java/org/activemq/command/ConsumerInfo.java index 05b82565b3..1be181eff8 100755 --- a/activemq-core/src/main/java/org/activemq/command/ConsumerInfo.java +++ b/activemq-core/src/main/java/org/activemq/command/ConsumerInfo.java @@ -278,6 +278,7 @@ public class ConsumerInfo extends BaseCommand { } /** + * @openwire:property version=1 * @return Returns the networkSubscription. */ public boolean isNetworkSubscription(){ diff --git a/activemq-core/src/main/java/org/activemq/command/DiscoveryEvent.java b/activemq-core/src/main/java/org/activemq/command/DiscoveryEvent.java index 0d932c6e1e..b89a300b42 100755 --- a/activemq-core/src/main/java/org/activemq/command/DiscoveryEvent.java +++ b/activemq-core/src/main/java/org/activemq/command/DiscoveryEvent.java @@ -1,5 +1,4 @@ /** -* ActiveMQ: The Open Source Message Fabric * * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com * @@ -49,16 +48,17 @@ public class DiscoveryEvent implements DataStructure { this.serviceName = serviceName; } + /** + * @openwire:property version=1 + */ public String getBrokerName(){ return brokerName; } - public void setBrokerName(String name){ this.brokerName = name; } public boolean isMarshallAware() { return false; - } - + } } diff --git a/activemq-core/src/main/java/org/activemq/command/Message.java b/activemq-core/src/main/java/org/activemq/command/Message.java index 2b892bc08a..121c0cc744 100755 --- a/activemq-core/src/main/java/org/activemq/command/Message.java +++ b/activemq-core/src/main/java/org/activemq/command/Message.java @@ -604,6 +604,7 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess } /** + * @openwire:property version=1 * @return Returns the recievedByDFBridge. */ public boolean isRecievedByDFBridge(){ diff --git a/activemq-core/src/main/java/org/activemq/network/DemandForwardingBridge.java b/activemq-core/src/main/java/org/activemq/network/DemandForwardingBridge.java index cbf82878b0..5ba7246dcf 100755 --- a/activemq-core/src/main/java/org/activemq/network/DemandForwardingBridge.java +++ b/activemq-core/src/main/java/org/activemq/network/DemandForwardingBridge.java @@ -19,6 +19,9 @@ package org.activemq.network; import java.io.IOException; + +import javax.jms.JMSException; + import org.activemq.advisory.AdvisorySupport; import org.activemq.command.ActiveMQTopic; import org.activemq.command.BrokerId; @@ -37,9 +40,12 @@ import org.activemq.command.ProducerInfo; import org.activemq.command.RemoveInfo; import org.activemq.command.SessionInfo; import org.activemq.command.ShutdownInfo; +import org.activemq.filter.BooleanExpression; +import org.activemq.filter.MessageEvaluationContext; import org.activemq.transport.Transport; import org.activemq.transport.TransportListener; import org.activemq.util.IdGenerator; +import org.activemq.util.JMSExceptionSupport; import org.activemq.util.LongSequenceGenerator; import org.activemq.util.ServiceStopper; import org.activemq.util.ServiceSupport; @@ -231,18 +237,22 @@ public class DemandForwardingBridge implements Bridge { // Create a new local subscription ConsumerInfo info = (ConsumerInfo) data; BrokerId[] path = info.getBrokerPath(); - String pathStr = "{"; - for (int i =0; path != null && i < path.length; i++){ - pathStr += path[i] + " , "; + + if( (path!=null && path.length>0) || info.isNetworkSubscription() ) { + // Ignore: We only support directly connected brokers for now. + return; } - pathStr += "}"; if( contains(info.getBrokerPath(), localBrokerPath[0]) ) { // Ignore this consumer as it's a consumer we locally sent to the broker. return; } - + + if( log.isTraceEnabled() ) + log.trace("Forwarding sub on " + localBroker + " from " + remoteBroker + " on "+info); + // Update the packet to show where it came from. + info = info.copy(); info.setBrokerPath( appendToBrokerPath(info.getBrokerPath(), remoteBrokerPath) ); DemandSubscription sub = new DemandSubscription(info); @@ -259,6 +269,21 @@ public class DemandForwardingBridge implements Bridge { subscriptionMapByRemoteId.put(sub.remoteInfo.getConsumerId(), sub); sub.localInfo.setBrokerPath(info.getBrokerPath()); sub.localInfo.setNetworkSubscription(true); + // This works for now since we use a VM connection to the local broker. + // may need to change if we ever subscribe to a remote broker. + sub.localInfo.setAdditionalPredicate(new BooleanExpression(){ + public boolean matches(MessageEvaluationContext message) throws JMSException { + try { + return matchesForwardingFilter(message.getMessage()); + } catch (IOException e) { + throw JMSExceptionSupport.create(e); + } + } + public Object evaluate(MessageEvaluationContext message) throws JMSException { + return matches(message) ? Boolean.TRUE : Boolean.FALSE; + } + }); + localBroker.oneway(sub.localInfo); } if( data.getClass() == RemoveInfo.class ) { @@ -275,30 +300,35 @@ public class DemandForwardingBridge implements Bridge { log.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: "+error.getMessage(), error); ServiceSupport.dispose(this); } + + boolean matchesForwardingFilter(Message message) { + if( message.isRecievedByDFBridge() || contains(message.getBrokerPath(), remoteBrokerPath[0]) ) + return false; + + // Don't propagate advisory messages about network subscriptions + if( message.isAdvisory() + && message.getDataStructure()!=null + && message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO) { + + ConsumerInfo info=(ConsumerInfo) message.getDataStructure(); + if(info.isNetworkSubscription()) { + return false; + } + } + return true; + } protected void serviceLocalCommand(Command command) { + boolean trace = log.isTraceEnabled(); try { if( command.isMessageDispatch() ) { MessageDispatch md = (MessageDispatch) command; Message message = md.getMessage(); - //only allow one network hop for this type of bridge - if (message.isRecievedByDFBridge()){ - return; - } - if (message.isAdvisory() && message.getDataStructure() != null && message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){ - ConsumerInfo info = (ConsumerInfo)message.getDataStructure(); - if (info.isNetworkSubscription()){ - //don't want to forward these - return; - } - } DemandSubscription sub = (DemandSubscription)subscriptionMapByLocalId.get(md.getConsumerId()); if( sub!=null ) { - if( contains(message.getBrokerPath(), remoteBrokerPath[0]) ) { - // Don't send the message back to the originator - return; - } + message = message.copy(); + // Update the packet to show where it came from. message.setBrokerPath( appendToBrokerPath(message.getBrokerPath(), localBrokerPath) ); @@ -308,9 +338,14 @@ public class DemandForwardingBridge implements Bridge { if( message.getOriginalTransactionId()==null ) message.setOriginalTransactionId(message.getTransactionId()); message.setTransactionId(null); + message.setRecievedByDFBridge(true); message.evictMarshlledForm(); + if( trace ) + log.trace("bridging " + localBroker + " -> " + remoteBroker + ": "+message); + remoteBroker.oneway( message ); + sub.dispatched++; if( sub.dispatched > (sub.localInfo.getPrefetchSize()*.75) ) { localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched)); diff --git a/activemq-core/src/main/java/org/activemq/network/NetworkConnector.java b/activemq-core/src/main/java/org/activemq/network/NetworkConnector.java index d9c6d07255..567758785f 100644 --- a/activemq-core/src/main/java/org/activemq/network/NetworkConnector.java +++ b/activemq-core/src/main/java/org/activemq/network/NetworkConnector.java @@ -48,6 +48,7 @@ public class NetworkConnector implements Service, DiscoveryListener { private ConcurrentHashMap bridges = new ConcurrentHashMap(); private String brokerName; + boolean failover=true; public NetworkConnector() { } @@ -88,7 +89,17 @@ public class NetworkConnector implements Service, DiscoveryListener { if (bridges.containsKey(uri) || localURI.equals(uri)) return; - log.info("Establishing network connection between " + localURI + " and " + event.getBrokerName() + " at " + uri); + URI connectUri = uri; + if( failover ) { + try { + connectUri = new URI("failover:"+connectUri); + } catch (URISyntaxException e) { + log.warn("Could not create failover URI: "+connectUri); + return; + } + } + + log.info("Establishing network connection between " + localURI + " and " + event.getBrokerName() + " at " + connectUri); Transport localTransport; try { @@ -101,15 +112,15 @@ public class NetworkConnector implements Service, DiscoveryListener { Transport remoteTransport; try { - remoteTransport = TransportFactory.connect(uri); + remoteTransport = TransportFactory.connect(connectUri); } catch (Exception e) { ServiceSupport.dispose(localTransport); - log.warn("Could not connect to remote URI: " + uri + ": " + e, e); + log.warn("Could not connect to remote URI: " + connectUri + ": " + e, e); return; } - Bridge bridge = createBridge(localTransport, remoteTransport); + Bridge bridge = createBridge(localTransport, remoteTransport, event); bridges.put(uri, bridge); try { bridge.start(); @@ -170,8 +181,17 @@ public class NetworkConnector implements Service, DiscoveryListener { // Implementation methods // ------------------------------------------------------------------------- - protected Bridge createBridge(Transport localTransport, Transport remoteTransport) { - return new DemandForwardingBridge(localTransport, remoteTransport); + protected Bridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) { + return new DemandForwardingBridge(localTransport, remoteTransport) { + protected void serviceRemoteException(IOException error) { + super.serviceRemoteException(error); + try { + // Notify the discovery agent that the remote broker failed. + discoveryAgent.serviceFailed(event); + } catch (IOException e) { + } + } + }; } public void setBrokerName(String brokerName) { @@ -181,4 +201,12 @@ public class NetworkConnector implements Service, DiscoveryListener { } } + public boolean isFailover() { + return failover; + } + + public void setFailover(boolean reliable) { + this.failover = reliable; + } + } diff --git a/activemq-core/src/main/java/org/activemq/openwire/v1/ConsumerInfoMarshaller.java b/activemq-core/src/main/java/org/activemq/openwire/v1/ConsumerInfoMarshaller.java index f961064552..9bc106c288 100755 --- a/activemq-core/src/main/java/org/activemq/openwire/v1/ConsumerInfoMarshaller.java +++ b/activemq-core/src/main/java/org/activemq/openwire/v1/ConsumerInfoMarshaller.java @@ -89,6 +89,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller { info.setBrokerPath(null); } + info.setNetworkSubscription(bs.readBoolean()); } @@ -113,6 +114,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller { bs.writeBoolean(info.isRetroactive()); rc += marshalObjectArray(wireFormat, info.getBrokerPath(), bs); + bs.writeBoolean(info.isNetworkSubscription()); return rc+5; } @@ -140,6 +142,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller { bs.readBoolean(); dataOut.writeByte(info.getPriority()); marshalObjectArray(wireFormat, info.getBrokerPath(), dataOut, bs); + bs.readBoolean(); } } diff --git a/activemq-core/src/main/java/org/activemq/openwire/v1/MessageMarshaller.java b/activemq-core/src/main/java/org/activemq/openwire/v1/MessageMarshaller.java index a3499d9a9e..2ead9913c5 100755 --- a/activemq-core/src/main/java/org/activemq/openwire/v1/MessageMarshaller.java +++ b/activemq-core/src/main/java/org/activemq/openwire/v1/MessageMarshaller.java @@ -107,6 +107,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller { info.setArrival(unmarshalLong(wireFormat, dataIn, bs)); info.setUserID(readString(dataIn, bs)); + info.setRecievedByDFBridge(bs.readBoolean()); info.afterUnmarshall(wireFormat); @@ -153,6 +154,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller { rc += marshalObjectArray(wireFormat, info.getBrokerPath(), bs); rc+=marshal1Long(wireFormat, info.getArrival(), bs); rc += writeString(info.getUserID(), bs); + bs.writeBoolean(info.isRecievedByDFBridge()); return rc+9; } @@ -204,6 +206,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller { marshalObjectArray(wireFormat, info.getBrokerPath(), dataOut, bs); marshal2Long(wireFormat, info.getArrival(), dataOut, bs); writeString(info.getUserID(), dataOut, bs); + bs.readBoolean(); info.afterMarshall(wireFormat); diff --git a/activemq-core/src/main/java/org/activemq/transport/MutexTransport.java b/activemq-core/src/main/java/org/activemq/transport/MutexTransport.java index 492d2a14a5..c60374e855 100755 --- a/activemq-core/src/main/java/org/activemq/transport/MutexTransport.java +++ b/activemq-core/src/main/java/org/activemq/transport/MutexTransport.java @@ -53,4 +53,8 @@ public class MutexTransport extends TransportFilter { } } + public String toString() { + return next.toString(); + } + } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/activemq/transport/ResponseCorrelator.java b/activemq-core/src/main/java/org/activemq/transport/ResponseCorrelator.java index b32f2f52a0..e8470dea18 100755 --- a/activemq-core/src/main/java/org/activemq/transport/ResponseCorrelator.java +++ b/activemq-core/src/main/java/org/activemq/transport/ResponseCorrelator.java @@ -84,4 +84,9 @@ final public class ResponseCorrelator extends TransportFilter { commandListener.onCommand(command); } } + + public String toString() { + return next.toString(); + } + } diff --git a/activemq-core/src/main/java/org/activemq/transport/TransportLogger.java b/activemq-core/src/main/java/org/activemq/transport/TransportLogger.java index a985d517cb..c48133b5c0 100755 --- a/activemq-core/src/main/java/org/activemq/transport/TransportLogger.java +++ b/activemq-core/src/main/java/org/activemq/transport/TransportLogger.java @@ -52,4 +52,8 @@ public class TransportLogger extends TransportFilter { } next.oneway(command); } + + public String toString() { + return next.toString(); + } } diff --git a/activemq-core/src/main/java/org/activemq/transport/WireFormatNegotiator.java b/activemq-core/src/main/java/org/activemq/transport/WireFormatNegotiator.java index 552659d024..679c8b5de3 100755 --- a/activemq-core/src/main/java/org/activemq/transport/WireFormatNegotiator.java +++ b/activemq-core/src/main/java/org/activemq/transport/WireFormatNegotiator.java @@ -106,4 +106,8 @@ public class WireFormatNegotiator extends TransportFilter { } commandListener.onCommand(command); } + + public String toString() { + return next.toString(); + } } diff --git a/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgent.java b/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgent.java index a962b201ad..5fb0f09740 100755 --- a/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgent.java +++ b/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgent.java @@ -23,6 +23,7 @@ import java.io.IOException; import javax.jms.JMSException; import org.activemq.Service; +import org.activemq.command.DiscoveryEvent; /** * An agent used to discover other instances of a service. @@ -47,6 +48,12 @@ public interface DiscoveryAgent extends Service { */ void registerService(String name) throws IOException; + /** + * A process actively using a service may see it go down before the DiscoveryAgent notices the + * service's failure. That process can use this method to notify the DiscoveryAgent of the failure + * so that other listeners of this DiscoveryAgent can also be made aware of the failure. + */ + void serviceFailed(DiscoveryEvent event) throws IOException; String getGroup(); diff --git a/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java b/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java index 09e944fd83..62ca0f61db 100755 --- a/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java +++ b/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java @@ -348,4 +348,8 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent,Runnable{ } return result; } + + public void serviceFailed(DiscoveryEvent event) throws IOException { + processDead(event.getBrokerName(), event.getServiceName()); + } } diff --git a/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java b/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java index 28ae87cf02..48580c855b 100755 --- a/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java +++ b/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java @@ -236,4 +236,8 @@ public class RendezvousDiscoveryAgent implements DiscoveryAgent, ServiceListener public void setBrokerName(String brokerName) { this.brokerName = brokerName; } + + public void serviceFailed(DiscoveryEvent event) throws IOException { + // TODO: is there a way to notify the JmDNS that the service failed? + } } diff --git a/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java b/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java index 5758f2cf31..0af92dfef7 100755 --- a/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java +++ b/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java @@ -82,4 +82,7 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent { public void setBrokerName(String brokerName) { } + public void serviceFailed(DiscoveryEvent event) throws IOException { + } + } diff --git a/activemq-core/src/main/java/org/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/activemq/transport/failover/FailoverTransport.java index 7b0c21dfa7..86dbfc0d95 100755 --- a/activemq-core/src/main/java/org/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/activemq/transport/failover/FailoverTransport.java @@ -397,4 +397,8 @@ public class FailoverTransport implements CompositeTransport { this.useExponentialBackOff = useExponentialBackOff; } + public String toString() { + return connectedTransportURI==null ? "unconnected" : connectedTransportURI.toString(); + } + } diff --git a/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java b/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java index 25b33fe2d4..602175e9e8 100755 --- a/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java +++ b/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java @@ -67,7 +67,7 @@ public class PeerTransportFactory extends TransportFactory { private VMTransportFactory createTransportFactory(URI location) throws IOException { try { String group = location.getHost(); - String broker = location.getPath(); + String broker = URISupport.stripPrefix(location.getPath(), "/"); if( group == null ) { group = "default"; diff --git a/activemq-core/src/main/java/org/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/activemq/transport/tcp/TcpTransport.java index 58e5604228..a4160d0815 100755 --- a/activemq-core/src/main/java/org/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/activemq/transport/tcp/TcpTransport.java @@ -121,7 +121,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S * @return pretty print of 'this' */ public String toString() { - return "TcpTransport: " + socket; + return "tcp://"+socket.getInetAddress()+":"+socket.getPort(); } /** diff --git a/activemq-core/src/main/java/org/activemq/transport/vm/VMTransport.java b/activemq-core/src/main/java/org/activemq/transport/vm/VMTransport.java index 2a9eb18702..5cce019f1e 100755 --- a/activemq-core/src/main/java/org/activemq/transport/vm/VMTransport.java +++ b/activemq-core/src/main/java/org/activemq/transport/vm/VMTransport.java @@ -16,10 +16,12 @@ package org.activemq.transport.vm; import java.io.IOException; +import java.net.URI; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; + import org.activemq.command.Command; import org.activemq.command.Response; import org.activemq.transport.FutureResponse; @@ -27,6 +29,8 @@ import org.activemq.transport.Transport; import org.activemq.transport.TransportListener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong; /** * A Transport implementation that uses direct method invocations. * @@ -34,13 +38,21 @@ import org.apache.commons.logging.LogFactory; */ public class VMTransport implements Transport{ private static final Log log=LogFactory.getLog(VMTransport.class); + private static final AtomicLong nextId = new AtomicLong(0); + protected VMTransport peer; protected TransportListener transportListener; protected boolean disposed; protected boolean marshal; protected boolean network; protected List queue = Collections.synchronizedList(new LinkedList()); + protected final URI location; + protected final long id; + public VMTransport(URI location) { + this.location = location; + this.id=nextId.getAndIncrement(); + } synchronized public VMTransport getPeer(){ return peer; @@ -116,5 +128,9 @@ public class VMTransport implements Transport{ public void setNetwork(boolean network){ this.network=network; } + + public String toString() { + return location+"#"+id; + } } diff --git a/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportServer.java b/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportServer.java index 04aa7d205b..727dbf2361 100755 --- a/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportServer.java +++ b/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportServer.java @@ -74,7 +74,7 @@ public class VMTransportServer implements TransportServer { throw new IOException("Server TransportAcceptListener is null."); connectionCount.incrementAndGet(); - VMTransport client = new VMTransport() { + VMTransport client = new VMTransport(location) { public void stop() throws Exception { if( disposed ) return; @@ -85,7 +85,7 @@ public class VMTransportServer implements TransportServer { }; }; - VMTransport server = new VMTransport(); + VMTransport server = new VMTransport(location); client.setPeer(server); server.setPeer(client); al.onAccept(configure(server)); diff --git a/activemq-core/src/main/java/org/activemq/util/MessageList.java b/activemq-core/src/main/java/org/activemq/util/MessageList.java index f71bc6b3f2..3bbf7e989c 100644 --- a/activemq-core/src/main/java/org/activemq/util/MessageList.java +++ b/activemq-core/src/main/java/org/activemq/util/MessageList.java @@ -17,14 +17,16 @@ **/ package org.activemq.util; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + import javax.jms.Message; import javax.jms.MessageListener; +import javax.jms.TextMessage; import junit.framework.Assert; -import java.util.ArrayList; -import java.util.List; - /** * A simple container for performing testing and rendezvous style code. * @@ -59,6 +61,21 @@ public class MessageList extends Assert implements MessageListener { return new ArrayList(messages); } } + + public synchronized List getTextMessages() { + synchronized (semaphore) { + ArrayList l = new ArrayList(); + for (Iterator iter = messages.iterator(); iter.hasNext();) { + try { + TextMessage m = (TextMessage) iter.next(); + l.add(m.getText()); + } catch (Throwable e) { + l.add(""+e); + } + } + return l; + } + } public void onMessage(Message message) { synchronized (semaphore) { diff --git a/activemq-core/src/main/java/org/activemq/util/URISupport.java b/activemq-core/src/main/java/org/activemq/util/URISupport.java index 16819c4012..891bac6032 100755 --- a/activemq-core/src/main/java/org/activemq/util/URISupport.java +++ b/activemq-core/src/main/java/org/activemq/util/URISupport.java @@ -236,7 +236,7 @@ public class URISupport { return rc; } - private static String stripPrefix(String value, String prefix) { + public static String stripPrefix(String value, String prefix) { if( value.startsWith(prefix) ) return value.substring(prefix.length()); return value; diff --git a/activemq-core/src/test/java/org/activemq/jndi/JNDITestSupport.java b/activemq-core/src/test/java/org/activemq/jndi/JNDITestSupport.java index efd31be3ed..e1e29fa9fe 100755 --- a/activemq-core/src/test/java/org/activemq/jndi/JNDITestSupport.java +++ b/activemq-core/src/test/java/org/activemq/jndi/JNDITestSupport.java @@ -91,7 +91,6 @@ public abstract class JNDITestSupport extends TestCase { } protected void configureEnvironment() { - environment.put("useEmbeddedBroker", "true"); environment.put("brokerURL", "vm://localhost"); } diff --git a/activemq-core/src/test/java/org/activemq/transport/peer/PeerTransportTest.java b/activemq-core/src/test/java/org/activemq/transport/peer/PeerTransportTest.java index 8b4dbb0b9c..be9e2ea40f 100755 --- a/activemq-core/src/test/java/org/activemq/transport/peer/PeerTransportTest.java +++ b/activemq-core/src/test/java/org/activemq/transport/peer/PeerTransportTest.java @@ -48,27 +48,28 @@ public class PeerTransportTest extends TestCase { protected int deliveryMode = DeliveryMode.NON_PERSISTENT; protected MessageProducer[] producers; protected Connection[] connections; - protected MessageList messageList = new MessageList(); + protected MessageList messageList[]; protected void setUp() throws Exception { - messageList.setVerbose(true); connections = new Connection[NUMBER_IN_CLUSTER]; producers = new MessageProducer[NUMBER_IN_CLUSTER]; + messageList = new MessageList[NUMBER_IN_CLUSTER]; Destination destination = createDestination(); String root = System.getProperty("activemq.store.dir"); for (int i = 0;i < NUMBER_IN_CLUSTER;i++) { - System.setProperty("activemq.store.dir", root + "_broker_" + i); - connections[i] = createConnection(); + connections[i] = createConnection(i); connections[i].setClientID("ClusterTest" + i); connections[i].start(); + Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE); producers[i] = session.createProducer(destination); producers[i].setDeliveryMode(deliveryMode); MessageConsumer consumer = createMessageConsumer(session, destination); - consumer.setMessageListener(messageList); + messageList[i] = new MessageList(); + consumer.setMessageListener(messageList[i]); } System.out.println("Sleeping to ensure cluster is fully connected"); Thread.sleep(10000); @@ -87,13 +88,9 @@ public class PeerTransportTest extends TestCase { return session.createConsumer(destination); } - protected int expectedReceiveCount() { - return MESSAGE_COUNT * NUMBER_IN_CLUSTER * NUMBER_IN_CLUSTER; - } - - protected Connection createConnection() throws JMSException { + protected Connection createConnection(int i) throws JMSException { System.err.println("creating connection ...."); - ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("peer://" + getClass().getName()); + ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("peer://" + getClass().getName()+"/node"+i); return fac.createConnection(); } @@ -120,10 +117,16 @@ public class PeerTransportTest extends TestCase { TextMessage textMessage = new ActiveMQTextMessage(); textMessage.setText("MSG-NO: " + i + " in cluster: " + x); producers[x].send(textMessage); - // System.out.println("SENT MSG: " + textMessage); } } - messageList.assertMessagesReceived(expectedReceiveCount()); + for (int i = 0;i < NUMBER_IN_CLUSTER;i++) { + messageList[i].assertMessagesReceived(expectedReceiveCount()); + } } + + protected int expectedReceiveCount() { + return MESSAGE_COUNT * NUMBER_IN_CLUSTER; + } + } \ No newline at end of file diff --git a/activemq-core/src/test/resources/jndi.properties b/activemq-core/src/test/resources/jndi.properties index 7a1aebd6a7..833cf79207 100755 --- a/activemq-core/src/test/resources/jndi.properties +++ b/activemq-core/src/test/resources/jndi.properties @@ -5,9 +5,6 @@ java.naming.factory.initial = org.activemq.jndi.ActiveMQInitialContextFactory # use the following property to configure the default connector java.naming.provider.url = vm://localhost -# use the following property to embed a broker inside this JVM -#useEmbeddedBroker = true - # use the following property to specify a class path resource or URL # used to configure an embedded broker using the XML configuration file #brokerXmlConfig = file:src/conf/sample-conf/default.xml diff --git a/activemq-core/src/test/resources/spring-embedded.xml b/activemq-core/src/test/resources/spring-embedded.xml index a817a45f93..7aa3334215 100755 --- a/activemq-core/src/test/resources/spring-embedded.xml +++ b/activemq-core/src/test/resources/spring-embedded.xml @@ -12,9 +12,6 @@ - - true - diff --git a/activemq-core/src/test/resources/spring-jndi.xml b/activemq-core/src/test/resources/spring-jndi.xml index e3860fb401..bcc5e138c9 100755 --- a/activemq-core/src/test/resources/spring-jndi.xml +++ b/activemq-core/src/test/resources/spring-jndi.xml @@ -12,8 +12,6 @@ org.activemq.jndi.ActiveMQInitialContextFactory - true - example.Spring.MyTopic