- updated openwire marshalers.

- added some more toString() methods to the transports
- Fixed up the advisories so that duplicate consumer infos are not sent
- Changed the demand forwarding bridge so that loop back message filtering occurs in the broker instead of on the bridge


git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@358056 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2005-12-20 18:05:38 +00:00
parent 80d85fecb9
commit 8032ef40be
32 changed files with 220 additions and 108 deletions

View File

@ -66,8 +66,6 @@ public class ActiveMQConnectionFactory implements ConnectionFactory, QueueConnec
protected String password;
protected String clientID;
protected boolean useEmbeddedBroker;
// optimization flags
private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
private boolean disableTimeStampsByDefault = false;
@ -317,14 +315,6 @@ public class ActiveMQConnectionFactory implements ConnectionFactory, QueueConnec
this.useAsyncSend = useAsyncSend;
}
public boolean isUseEmbeddedBroker() {
return useEmbeddedBroker;
}
public void setUseEmbeddedBroker(boolean useEmbeddedBroker) {
this.useEmbeddedBroker = useEmbeddedBroker;
}
public String getUserName() {
return userName;
}

View File

@ -118,7 +118,7 @@ public class AdvisoryBroker extends BrokerFilter {
for (Iterator iter = consumers.values().iterator(); iter.hasNext();) {
ConsumerInfo value = (ConsumerInfo) iter.next();
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
fireConsumerAdvisory(context, topic, value);
fireConsumerAdvisory(context, topic, value, info.getConsumerId());
}
}
}
@ -194,9 +194,12 @@ public class AdvisoryBroker extends BrokerFilter {
}
protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Throwable {
fireConsumerAdvisory(context, topic, command, null);
}
protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Throwable {
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setIntProperty("consumerCount", consumers.size());
fireAdvisory(context, topic, command, null, advisoryMessage);
fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
}
protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Throwable {

View File

@ -78,7 +78,6 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
protected final List dispatchQueue = Collections.synchronizedList(new LinkedList());
protected final TaskRunner taskRunner;
protected final Connector connector;
protected boolean demandForwardingBridge;
private ConnectionStatistics statistics = new ConnectionStatistics();
protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
@ -326,7 +325,6 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
public Response processMessage(Message messageSend) throws Throwable {
messageSend.setRecievedByDFBridge(demandForwardingBridge);
broker.send(lookupConnectionState(messageSend.getProducerId()).getContext(), messageSend);
return null;
}
@ -337,7 +335,6 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
public Response processBrokerInfo(BrokerInfo info) {
demandForwardingBridge = true;
return null;
}

View File

@ -21,10 +21,6 @@ package org.activemq.broker;
import java.io.IOException;
import org.activemq.command.Command;
import org.activemq.command.CommandTypes;
import org.activemq.command.ConsumerInfo;
import org.activemq.command.Message;
import org.activemq.command.MessageDispatch;
import org.activemq.command.Response;
import org.activemq.thread.TaskRunnerFactory;
import org.activemq.transport.Transport;
@ -183,34 +179,15 @@ public class TransportConnection extends AbstractConnection {
protected void dispatch(Command command){
if(isValidForNetwork(command)){
try{
setMarkedCandidate(true);
transport.oneway(command);
getStatistics().onCommand(command);
}catch(IOException e){
serviceException(e);
}finally{
setMarkedCandidate(false);
}
try{
setMarkedCandidate(true);
transport.oneway(command);
getStatistics().onCommand(command);
}catch(IOException e){
serviceException(e);
}finally{
setMarkedCandidate(false);
}
}
protected boolean isValidForNetwork(Command command){
boolean result=true;
if(demandForwardingBridge&&command.isMessageDispatch()){
MessageDispatch md=(MessageDispatch) command;
Message message=md.getMessage();
if(message.isAdvisory()&&message.getDataStructure()!=null
&&message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){
ConsumerInfo info=(ConsumerInfo) message.getDataStructure();
if(info.isNetworkSubscription()){
// don't want to forward these
result=false;
}
}
}
return result;
}
}
}

View File

@ -20,6 +20,7 @@ package org.activemq.command;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.LinkedHashMap;
/**
@ -118,7 +119,14 @@ abstract public class BaseCommand implements Command {
}
try {
map.put(field.getName(), field.get(this));
Object o = field.get(this);
if( o!=null && o.getClass().isArray() ) {
try {
o = Arrays.asList((Object[]) o);
} catch (Throwable e) {
}
}
map.put(field.getName(), o);
} catch (Throwable e) {
e.printStackTrace();
}

View File

@ -278,6 +278,7 @@ public class ConsumerInfo extends BaseCommand {
}
/**
* @openwire:property version=1
* @return Returns the networkSubscription.
*/
public boolean isNetworkSubscription(){

View File

@ -1,5 +1,4 @@
/**
* <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
*
* Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
*
@ -49,16 +48,17 @@ public class DiscoveryEvent implements DataStructure {
this.serviceName = serviceName;
}
/**
* @openwire:property version=1
*/
public String getBrokerName(){
return brokerName;
}
public void setBrokerName(String name){
this.brokerName = name;
}
public boolean isMarshallAware() {
return false;
}
}
}

View File

@ -604,6 +604,7 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
}
/**
* @openwire:property version=1
* @return Returns the recievedByDFBridge.
*/
public boolean isRecievedByDFBridge(){

View File

@ -19,6 +19,9 @@
package org.activemq.network;
import java.io.IOException;
import javax.jms.JMSException;
import org.activemq.advisory.AdvisorySupport;
import org.activemq.command.ActiveMQTopic;
import org.activemq.command.BrokerId;
@ -37,9 +40,12 @@ import org.activemq.command.ProducerInfo;
import org.activemq.command.RemoveInfo;
import org.activemq.command.SessionInfo;
import org.activemq.command.ShutdownInfo;
import org.activemq.filter.BooleanExpression;
import org.activemq.filter.MessageEvaluationContext;
import org.activemq.transport.Transport;
import org.activemq.transport.TransportListener;
import org.activemq.util.IdGenerator;
import org.activemq.util.JMSExceptionSupport;
import org.activemq.util.LongSequenceGenerator;
import org.activemq.util.ServiceStopper;
import org.activemq.util.ServiceSupport;
@ -231,18 +237,22 @@ public class DemandForwardingBridge implements Bridge {
// Create a new local subscription
ConsumerInfo info = (ConsumerInfo) data;
BrokerId[] path = info.getBrokerPath();
String pathStr = "{";
for (int i =0; path != null && i < path.length; i++){
pathStr += path[i] + " , ";
if( (path!=null && path.length>0) || info.isNetworkSubscription() ) {
// Ignore: We only support directly connected brokers for now.
return;
}
pathStr += "}";
if( contains(info.getBrokerPath(), localBrokerPath[0]) ) {
// Ignore this consumer as it's a consumer we locally sent to the broker.
return;
}
if( log.isTraceEnabled() )
log.trace("Forwarding sub on " + localBroker + " from " + remoteBroker + " on "+info);
// Update the packet to show where it came from.
info = info.copy();
info.setBrokerPath( appendToBrokerPath(info.getBrokerPath(), remoteBrokerPath) );
DemandSubscription sub = new DemandSubscription(info);
@ -259,6 +269,21 @@ public class DemandForwardingBridge implements Bridge {
subscriptionMapByRemoteId.put(sub.remoteInfo.getConsumerId(), sub);
sub.localInfo.setBrokerPath(info.getBrokerPath());
sub.localInfo.setNetworkSubscription(true);
// This works for now since we use a VM connection to the local broker.
// may need to change if we ever subscribe to a remote broker.
sub.localInfo.setAdditionalPredicate(new BooleanExpression(){
public boolean matches(MessageEvaluationContext message) throws JMSException {
try {
return matchesForwardingFilter(message.getMessage());
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
public Object evaluate(MessageEvaluationContext message) throws JMSException {
return matches(message) ? Boolean.TRUE : Boolean.FALSE;
}
});
localBroker.oneway(sub.localInfo);
}
if( data.getClass() == RemoveInfo.class ) {
@ -275,30 +300,35 @@ public class DemandForwardingBridge implements Bridge {
log.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: "+error.getMessage(), error);
ServiceSupport.dispose(this);
}
boolean matchesForwardingFilter(Message message) {
if( message.isRecievedByDFBridge() || contains(message.getBrokerPath(), remoteBrokerPath[0]) )
return false;
// Don't propagate advisory messages about network subscriptions
if( message.isAdvisory()
&& message.getDataStructure()!=null
&& message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO) {
ConsumerInfo info=(ConsumerInfo) message.getDataStructure();
if(info.isNetworkSubscription()) {
return false;
}
}
return true;
}
protected void serviceLocalCommand(Command command) {
boolean trace = log.isTraceEnabled();
try {
if( command.isMessageDispatch() ) {
MessageDispatch md = (MessageDispatch) command;
Message message = md.getMessage();
//only allow one network hop for this type of bridge
if (message.isRecievedByDFBridge()){
return;
}
if (message.isAdvisory() && message.getDataStructure() != null && message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){
ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
if (info.isNetworkSubscription()){
//don't want to forward these
return;
}
}
DemandSubscription sub = (DemandSubscription)subscriptionMapByLocalId.get(md.getConsumerId());
if( sub!=null ) {
if( contains(message.getBrokerPath(), remoteBrokerPath[0]) ) {
// Don't send the message back to the originator
return;
}
message = message.copy();
// Update the packet to show where it came from.
message.setBrokerPath( appendToBrokerPath(message.getBrokerPath(), localBrokerPath) );
@ -308,9 +338,14 @@ public class DemandForwardingBridge implements Bridge {
if( message.getOriginalTransactionId()==null )
message.setOriginalTransactionId(message.getTransactionId());
message.setTransactionId(null);
message.setRecievedByDFBridge(true);
message.evictMarshlledForm();
if( trace )
log.trace("bridging " + localBroker + " -> " + remoteBroker + ": "+message);
remoteBroker.oneway( message );
sub.dispatched++;
if( sub.dispatched > (sub.localInfo.getPrefetchSize()*.75) ) {
localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched));

View File

@ -48,6 +48,7 @@ public class NetworkConnector implements Service, DiscoveryListener {
private ConcurrentHashMap bridges = new ConcurrentHashMap();
private String brokerName;
boolean failover=true;
public NetworkConnector() {
}
@ -88,7 +89,17 @@ public class NetworkConnector implements Service, DiscoveryListener {
if (bridges.containsKey(uri) || localURI.equals(uri))
return;
log.info("Establishing network connection between " + localURI + " and " + event.getBrokerName() + " at " + uri);
URI connectUri = uri;
if( failover ) {
try {
connectUri = new URI("failover:"+connectUri);
} catch (URISyntaxException e) {
log.warn("Could not create failover URI: "+connectUri);
return;
}
}
log.info("Establishing network connection between " + localURI + " and " + event.getBrokerName() + " at " + connectUri);
Transport localTransport;
try {
@ -101,15 +112,15 @@ public class NetworkConnector implements Service, DiscoveryListener {
Transport remoteTransport;
try {
remoteTransport = TransportFactory.connect(uri);
remoteTransport = TransportFactory.connect(connectUri);
}
catch (Exception e) {
ServiceSupport.dispose(localTransport);
log.warn("Could not connect to remote URI: " + uri + ": " + e, e);
log.warn("Could not connect to remote URI: " + connectUri + ": " + e, e);
return;
}
Bridge bridge = createBridge(localTransport, remoteTransport);
Bridge bridge = createBridge(localTransport, remoteTransport, event);
bridges.put(uri, bridge);
try {
bridge.start();
@ -170,8 +181,17 @@ public class NetworkConnector implements Service, DiscoveryListener {
// Implementation methods
// -------------------------------------------------------------------------
protected Bridge createBridge(Transport localTransport, Transport remoteTransport) {
return new DemandForwardingBridge(localTransport, remoteTransport);
protected Bridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
return new DemandForwardingBridge(localTransport, remoteTransport) {
protected void serviceRemoteException(IOException error) {
super.serviceRemoteException(error);
try {
// Notify the discovery agent that the remote broker failed.
discoveryAgent.serviceFailed(event);
} catch (IOException e) {
}
}
};
}
public void setBrokerName(String brokerName) {
@ -181,4 +201,12 @@ public class NetworkConnector implements Service, DiscoveryListener {
}
}
public boolean isFailover() {
return failover;
}
public void setFailover(boolean reliable) {
this.failover = reliable;
}
}

View File

@ -89,6 +89,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller {
info.setBrokerPath(null);
}
info.setNetworkSubscription(bs.readBoolean());
}
@ -113,6 +114,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller {
bs.writeBoolean(info.isRetroactive());
rc += marshalObjectArray(wireFormat, info.getBrokerPath(), bs);
bs.writeBoolean(info.isNetworkSubscription());
return rc+5;
}
@ -140,6 +142,7 @@ public class ConsumerInfoMarshaller extends BaseCommandMarshaller {
bs.readBoolean();
dataOut.writeByte(info.getPriority());
marshalObjectArray(wireFormat, info.getBrokerPath(), dataOut, bs);
bs.readBoolean();
}
}

View File

@ -107,6 +107,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller {
info.setArrival(unmarshalLong(wireFormat, dataIn, bs));
info.setUserID(readString(dataIn, bs));
info.setRecievedByDFBridge(bs.readBoolean());
info.afterUnmarshall(wireFormat);
@ -153,6 +154,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller {
rc += marshalObjectArray(wireFormat, info.getBrokerPath(), bs);
rc+=marshal1Long(wireFormat, info.getArrival(), bs);
rc += writeString(info.getUserID(), bs);
bs.writeBoolean(info.isRecievedByDFBridge());
return rc+9;
}
@ -204,6 +206,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller {
marshalObjectArray(wireFormat, info.getBrokerPath(), dataOut, bs);
marshal2Long(wireFormat, info.getArrival(), dataOut, bs);
writeString(info.getUserID(), dataOut, bs);
bs.readBoolean();
info.afterMarshall(wireFormat);

View File

@ -53,4 +53,8 @@ public class MutexTransport extends TransportFilter {
}
}
public String toString() {
return next.toString();
}
}

View File

@ -84,4 +84,9 @@ final public class ResponseCorrelator extends TransportFilter {
commandListener.onCommand(command);
}
}
public String toString() {
return next.toString();
}
}

View File

@ -52,4 +52,8 @@ public class TransportLogger extends TransportFilter {
}
next.oneway(command);
}
public String toString() {
return next.toString();
}
}

View File

@ -106,4 +106,8 @@ public class WireFormatNegotiator extends TransportFilter {
}
commandListener.onCommand(command);
}
public String toString() {
return next.toString();
}
}

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import javax.jms.JMSException;
import org.activemq.Service;
import org.activemq.command.DiscoveryEvent;
/**
* An agent used to discover other instances of a service.
@ -47,6 +48,12 @@ public interface DiscoveryAgent extends Service {
*/
void registerService(String name) throws IOException;
/**
* A process actively using a service may see it go down before the DiscoveryAgent notices the
* service's failure. That process can use this method to notify the DiscoveryAgent of the failure
* so that other listeners of this DiscoveryAgent can also be made aware of the failure.
*/
void serviceFailed(DiscoveryEvent event) throws IOException;
String getGroup();

View File

@ -348,4 +348,8 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent,Runnable{
}
return result;
}
public void serviceFailed(DiscoveryEvent event) throws IOException {
processDead(event.getBrokerName(), event.getServiceName());
}
}

View File

@ -236,4 +236,8 @@ public class RendezvousDiscoveryAgent implements DiscoveryAgent, ServiceListener
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
}
public void serviceFailed(DiscoveryEvent event) throws IOException {
// TODO: is there a way to notify the JmDNS that the service failed?
}
}

View File

@ -82,4 +82,7 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent {
public void setBrokerName(String brokerName) {
}
public void serviceFailed(DiscoveryEvent event) throws IOException {
}
}

View File

@ -397,4 +397,8 @@ public class FailoverTransport implements CompositeTransport {
this.useExponentialBackOff = useExponentialBackOff;
}
public String toString() {
return connectedTransportURI==null ? "unconnected" : connectedTransportURI.toString();
}
}

View File

@ -67,7 +67,7 @@ public class PeerTransportFactory extends TransportFactory {
private VMTransportFactory createTransportFactory(URI location) throws IOException {
try {
String group = location.getHost();
String broker = location.getPath();
String broker = URISupport.stripPrefix(location.getPath(), "/");
if( group == null ) {
group = "default";

View File

@ -121,7 +121,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
* @return pretty print of 'this'
*/
public String toString() {
return "TcpTransport: " + socket;
return "tcp://"+socket.getInetAddress()+":"+socket.getPort();
}
/**

View File

@ -16,10 +16,12 @@
package org.activemq.transport.vm;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.activemq.command.Command;
import org.activemq.command.Response;
import org.activemq.transport.FutureResponse;
@ -27,6 +29,8 @@ import org.activemq.transport.Transport;
import org.activemq.transport.TransportListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
/**
* A Transport implementation that uses direct method invocations.
*
@ -34,13 +38,21 @@ import org.apache.commons.logging.LogFactory;
*/
public class VMTransport implements Transport{
private static final Log log=LogFactory.getLog(VMTransport.class);
private static final AtomicLong nextId = new AtomicLong(0);
protected VMTransport peer;
protected TransportListener transportListener;
protected boolean disposed;
protected boolean marshal;
protected boolean network;
protected List queue = Collections.synchronizedList(new LinkedList());
protected final URI location;
protected final long id;
public VMTransport(URI location) {
this.location = location;
this.id=nextId.getAndIncrement();
}
synchronized public VMTransport getPeer(){
return peer;
@ -116,5 +128,9 @@ public class VMTransport implements Transport{
public void setNetwork(boolean network){
this.network=network;
}
public String toString() {
return location+"#"+id;
}
}

View File

@ -74,7 +74,7 @@ public class VMTransportServer implements TransportServer {
throw new IOException("Server TransportAcceptListener is null.");
connectionCount.incrementAndGet();
VMTransport client = new VMTransport() {
VMTransport client = new VMTransport(location) {
public void stop() throws Exception {
if( disposed )
return;
@ -85,7 +85,7 @@ public class VMTransportServer implements TransportServer {
};
};
VMTransport server = new VMTransport();
VMTransport server = new VMTransport(location);
client.setPeer(server);
server.setPeer(client);
al.onAccept(configure(server));

View File

@ -17,14 +17,16 @@
**/
package org.activemq.util;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import junit.framework.Assert;
import java.util.ArrayList;
import java.util.List;
/**
* A simple container for performing testing and rendezvous style code.
*
@ -59,6 +61,21 @@ public class MessageList extends Assert implements MessageListener {
return new ArrayList(messages);
}
}
public synchronized List getTextMessages() {
synchronized (semaphore) {
ArrayList l = new ArrayList();
for (Iterator iter = messages.iterator(); iter.hasNext();) {
try {
TextMessage m = (TextMessage) iter.next();
l.add(m.getText());
} catch (Throwable e) {
l.add(""+e);
}
}
return l;
}
}
public void onMessage(Message message) {
synchronized (semaphore) {

View File

@ -236,7 +236,7 @@ public class URISupport {
return rc;
}
private static String stripPrefix(String value, String prefix) {
public static String stripPrefix(String value, String prefix) {
if( value.startsWith(prefix) )
return value.substring(prefix.length());
return value;

View File

@ -91,7 +91,6 @@ public abstract class JNDITestSupport extends TestCase {
}
protected void configureEnvironment() {
environment.put("useEmbeddedBroker", "true");
environment.put("brokerURL", "vm://localhost");
}

View File

@ -48,27 +48,28 @@ public class PeerTransportTest extends TestCase {
protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
protected MessageProducer[] producers;
protected Connection[] connections;
protected MessageList messageList = new MessageList();
protected MessageList messageList[];
protected void setUp() throws Exception {
messageList.setVerbose(true);
connections = new Connection[NUMBER_IN_CLUSTER];
producers = new MessageProducer[NUMBER_IN_CLUSTER];
messageList = new MessageList[NUMBER_IN_CLUSTER];
Destination destination = createDestination();
String root = System.getProperty("activemq.store.dir");
for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
System.setProperty("activemq.store.dir", root + "_broker_" + i);
connections[i] = createConnection();
connections[i] = createConnection(i);
connections[i].setClientID("ClusterTest" + i);
connections[i].start();
Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
producers[i] = session.createProducer(destination);
producers[i].setDeliveryMode(deliveryMode);
MessageConsumer consumer = createMessageConsumer(session, destination);
consumer.setMessageListener(messageList);
messageList[i] = new MessageList();
consumer.setMessageListener(messageList[i]);
}
System.out.println("Sleeping to ensure cluster is fully connected");
Thread.sleep(10000);
@ -87,13 +88,9 @@ public class PeerTransportTest extends TestCase {
return session.createConsumer(destination);
}
protected int expectedReceiveCount() {
return MESSAGE_COUNT * NUMBER_IN_CLUSTER * NUMBER_IN_CLUSTER;
}
protected Connection createConnection() throws JMSException {
protected Connection createConnection(int i) throws JMSException {
System.err.println("creating connection ....");
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("peer://" + getClass().getName());
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("peer://" + getClass().getName()+"/node"+i);
return fac.createConnection();
}
@ -120,10 +117,16 @@ public class PeerTransportTest extends TestCase {
TextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("MSG-NO: " + i + " in cluster: " + x);
producers[x].send(textMessage);
// System.out.println("SENT MSG: " + textMessage);
}
}
messageList.assertMessagesReceived(expectedReceiveCount());
for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
messageList[i].assertMessagesReceived(expectedReceiveCount());
}
}
protected int expectedReceiveCount() {
return MESSAGE_COUNT * NUMBER_IN_CLUSTER;
}
}

View File

@ -5,9 +5,6 @@ java.naming.factory.initial = org.activemq.jndi.ActiveMQInitialContextFactory
# use the following property to configure the default connector
java.naming.provider.url = vm://localhost
# use the following property to embed a broker inside this JVM
#useEmbeddedBroker = true
# use the following property to specify a class path resource or URL
# used to configure an embedded broker using the XML configuration file
#brokerXmlConfig = file:src/conf/sample-conf/default.xml

View File

@ -12,9 +12,6 @@
<!-- JMS ConnectionFactory to use, configuring the embedded broker using XML -->
<bean id="jmsFactory" class="org.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost" />
<property name="useEmbeddedBroker">
<value>true</value>
</property>
</bean>
<!-- Spring JMS Template -->

View File

@ -12,8 +12,6 @@
<property name="environment">
<props>
<prop key="java.naming.factory.initial">org.activemq.jndi.ActiveMQInitialContextFactory</prop>
<prop key="useEmbeddedBroker">true</prop>
<!-- lets register some destinations -->
<prop key="topic.MyTopic">example.Spring.MyTopic</prop>
</props>