mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1326502 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fc4eb63ad8
commit
7dc2c4e26f
|
@ -30,7 +30,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import org.apache.activemq.Service;
|
||||
import org.apache.activemq.advisory.AdvisorySupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
|
@ -38,35 +37,11 @@ import org.apache.activemq.broker.BrokerServiceAware;
|
|||
import org.apache.activemq.broker.TransportConnection;
|
||||
import org.apache.activemq.broker.region.AbstractRegion;
|
||||
import org.apache.activemq.broker.region.DurableTopicSubscription;
|
||||
import org.apache.activemq.broker.region.Region;
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.apache.activemq.broker.region.Subscription;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQTempDestination;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.BrokerId;
|
||||
import org.apache.activemq.command.BrokerInfo;
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.command.ConnectionError;
|
||||
import org.apache.activemq.command.ConnectionId;
|
||||
import org.apache.activemq.command.ConnectionInfo;
|
||||
import org.apache.activemq.command.ConsumerId;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.activemq.command.DataStructure;
|
||||
import org.apache.activemq.command.DestinationInfo;
|
||||
import org.apache.activemq.command.ExceptionResponse;
|
||||
import org.apache.activemq.command.KeepAliveInfo;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.MessageDispatch;
|
||||
import org.apache.activemq.command.NetworkBridgeFilter;
|
||||
import org.apache.activemq.command.ProducerInfo;
|
||||
import org.apache.activemq.command.RemoveInfo;
|
||||
import org.apache.activemq.command.Response;
|
||||
import org.apache.activemq.command.SessionInfo;
|
||||
import org.apache.activemq.command.ShutdownInfo;
|
||||
import org.apache.activemq.command.WireFormatInfo;
|
||||
import org.apache.activemq.command.*;
|
||||
import org.apache.activemq.filter.DestinationFilter;
|
||||
import org.apache.activemq.filter.MessageEvaluationContext;
|
||||
import org.apache.activemq.thread.DefaultThreadPools;
|
||||
|
@ -1011,7 +986,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
|
||||
List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
|
||||
Collection<Subscription> currentSubs =
|
||||
getRegionSubscriptions(consumerInfo.getDestination().isTopic());
|
||||
getRegionSubscriptions(consumerInfo.getDestination());
|
||||
for (Subscription sub : currentSubs) {
|
||||
List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
|
||||
if (!networkConsumers.isEmpty()) {
|
||||
|
@ -1079,11 +1054,37 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
return found;
|
||||
}
|
||||
|
||||
private final Collection<Subscription> getRegionSubscriptions(boolean isTopic) {
|
||||
RegionBroker region = (RegionBroker) brokerService.getRegionBroker();
|
||||
AbstractRegion abstractRegion = (AbstractRegion)
|
||||
(isTopic ? region.getTopicRegion() : region.getQueueRegion());
|
||||
return abstractRegion.getSubscriptions().values();
|
||||
private final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
|
||||
RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
|
||||
Region region;
|
||||
Collection<Subscription> subs;
|
||||
|
||||
region = null;
|
||||
switch ( dest.getDestinationType() )
|
||||
{
|
||||
case ActiveMQDestination.QUEUE_TYPE:
|
||||
region = region_broker.getQueueRegion();
|
||||
break;
|
||||
|
||||
case ActiveMQDestination.TOPIC_TYPE:
|
||||
region = region_broker.getTopicRegion();
|
||||
break;
|
||||
|
||||
case ActiveMQDestination.TEMP_QUEUE_TYPE:
|
||||
region = region_broker.getTempQueueRegion();
|
||||
break;
|
||||
|
||||
case ActiveMQDestination.TEMP_TOPIC_TYPE:
|
||||
region = region_broker.getTempTopicRegion();
|
||||
break;
|
||||
}
|
||||
|
||||
if ( region instanceof AbstractRegion )
|
||||
subs = ((AbstractRegion) region).getSubscriptions().values();
|
||||
else
|
||||
subs = null;
|
||||
|
||||
return subs;
|
||||
}
|
||||
|
||||
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
|
||||
|
|
|
@ -0,0 +1,975 @@
|
|||
/*
|
||||
*/
|
||||
|
||||
package org.apache.activemq.bugs;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Enumeration;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.junit.*;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.network.DiscoveryNetworkConnector;
|
||||
import org.apache.activemq.network.NetworkConnector;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class AMQ3274Test
|
||||
{
|
||||
protected static int Next_broker_num = 0;
|
||||
protected EmbeddedTcpBroker broker1;
|
||||
protected EmbeddedTcpBroker broker2;
|
||||
|
||||
protected int nextEchoId = 0;
|
||||
protected boolean testError = false;
|
||||
|
||||
protected int echoResponseFill = 0; // Number of "filler" response messages per request
|
||||
|
||||
protected static Log LOG;
|
||||
|
||||
static
|
||||
{
|
||||
LOG = LogFactory.getLog(AMQ3274Test.class);
|
||||
}
|
||||
|
||||
public AMQ3274Test ()
|
||||
throws Exception
|
||||
{
|
||||
broker1 = new EmbeddedTcpBroker();
|
||||
broker2 = new EmbeddedTcpBroker();
|
||||
|
||||
broker1.coreConnectTo(broker2, true);
|
||||
broker2.coreConnectTo(broker1, true);
|
||||
}
|
||||
|
||||
public void logMessage (String msg)
|
||||
{
|
||||
System.out.println(msg);
|
||||
System.out.flush();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
||||
public void testMessages (Session sess, MessageProducer req_prod, Destination resp_dest, int num_msg)
|
||||
throws Exception
|
||||
{
|
||||
MessageConsumer resp_cons;
|
||||
TextMessage msg;
|
||||
MessageClient cons_client;
|
||||
int cur;
|
||||
int tot_expected;
|
||||
|
||||
resp_cons = sess.createConsumer(resp_dest);
|
||||
|
||||
cons_client = new MessageClient(resp_cons, num_msg);
|
||||
cons_client.start();
|
||||
|
||||
cur = 0;
|
||||
while ( ( cur < num_msg ) && ( ! testError ) )
|
||||
{
|
||||
msg = sess.createTextMessage("MSG AAAA " + cur);
|
||||
msg.setIntProperty("SEQ", 100 + cur);
|
||||
msg.setStringProperty("TEST", "TOPO");
|
||||
msg.setJMSReplyTo(resp_dest);
|
||||
|
||||
if ( cur == ( num_msg - 1 ) )
|
||||
msg.setBooleanProperty("end-of-response", true);
|
||||
|
||||
req_prod.send(msg);
|
||||
|
||||
cur++;
|
||||
}
|
||||
|
||||
//
|
||||
// Give the consumer some time to receive the response.
|
||||
//
|
||||
cons_client.waitShutdown(5000);
|
||||
|
||||
//
|
||||
// Now shutdown the consumer if it's still running.
|
||||
//
|
||||
if ( cons_client.shutdown() )
|
||||
LOG.debug("Consumer client shutdown complete");
|
||||
else
|
||||
LOG.debug("Consumer client shutdown incomplete!!!");
|
||||
|
||||
|
||||
//
|
||||
// Check that the correct number of messages was received.
|
||||
//
|
||||
tot_expected = num_msg * ( echoResponseFill + 1 );
|
||||
|
||||
if ( cons_client.getNumMsgReceived() == tot_expected )
|
||||
{
|
||||
LOG.info("Have " + tot_expected + " messages, as-expected");
|
||||
}
|
||||
else
|
||||
{
|
||||
testError = true;
|
||||
LOG.info("Have " + cons_client.getNumMsgReceived() + " messages; expected " + tot_expected);
|
||||
}
|
||||
|
||||
resp_cons.close();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test one destination between the given "producer broker" and "consumer broker" specified.
|
||||
*/
|
||||
public void testOneDest (Connection conn, Session sess, Destination cons_dest, String prod_broker_url,
|
||||
String cons_broker_url, int num_msg)
|
||||
throws Exception
|
||||
{
|
||||
int echo_id;
|
||||
|
||||
EchoService echo_svc;
|
||||
String echo_queue_name;
|
||||
Destination prod_dest;
|
||||
MessageProducer msg_prod;
|
||||
|
||||
synchronized ( this )
|
||||
{
|
||||
echo_id = this.nextEchoId;
|
||||
this.nextEchoId++;
|
||||
}
|
||||
|
||||
echo_queue_name = "echo.queue." + echo_id;
|
||||
|
||||
//
|
||||
// Remove any previously-created echo queue with the same name.
|
||||
//
|
||||
LOG.trace("destroying the echo queue in case an old one exists");
|
||||
removeQueue(conn, echo_queue_name);
|
||||
|
||||
|
||||
//
|
||||
// Now start the echo service with that queue.
|
||||
//
|
||||
echo_svc = new EchoService(echo_queue_name, prod_broker_url);
|
||||
echo_svc.start();
|
||||
|
||||
|
||||
//
|
||||
// Create the Producer to the echo request Queue
|
||||
//
|
||||
LOG.trace("Creating echo queue and producer");
|
||||
prod_dest = sess.createQueue(echo_queue_name);
|
||||
msg_prod = sess.createProducer(prod_dest);
|
||||
|
||||
|
||||
//
|
||||
// Pass messages around.
|
||||
//
|
||||
testMessages(sess, msg_prod, cons_dest, num_msg);
|
||||
|
||||
|
||||
//
|
||||
//
|
||||
//
|
||||
|
||||
echo_svc.shutdown();
|
||||
msg_prod.close();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* TEST TEMPORARY TOPICS
|
||||
*/
|
||||
public void testTempTopic (String prod_broker_url, String cons_broker_url)
|
||||
throws Exception
|
||||
{
|
||||
Connection conn;
|
||||
Session sess;
|
||||
Destination cons_dest;
|
||||
int echo_id;
|
||||
int num_msg;
|
||||
|
||||
num_msg = 5;
|
||||
|
||||
LOG.info("TESTING TEMP TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
|
||||
" messages)");
|
||||
|
||||
|
||||
//
|
||||
// Connect to the bus.
|
||||
//
|
||||
|
||||
conn = createConnection(cons_broker_url);
|
||||
conn.start();
|
||||
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
|
||||
//
|
||||
// Create the destination on which messages are being tested.
|
||||
//
|
||||
|
||||
LOG.trace("Creating destination");
|
||||
cons_dest = sess.createTemporaryTopic();
|
||||
|
||||
testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
|
||||
|
||||
|
||||
//
|
||||
// Cleanup
|
||||
//
|
||||
|
||||
sess.close();
|
||||
conn.close();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* TEST TOPICS
|
||||
*/
|
||||
public void testTopic (String prod_broker_url, String cons_broker_url)
|
||||
throws Exception
|
||||
{
|
||||
int num_msg;
|
||||
|
||||
Connection conn;
|
||||
Session sess;
|
||||
String topic_name;
|
||||
|
||||
Destination cons_dest;
|
||||
|
||||
num_msg = 5;
|
||||
|
||||
LOG.info("TESTING TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
|
||||
" messages)");
|
||||
|
||||
|
||||
//
|
||||
// Connect to the bus.
|
||||
//
|
||||
|
||||
conn = createConnection(cons_broker_url);
|
||||
conn.start();
|
||||
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
|
||||
//
|
||||
// Create the destination on which messages are being tested.
|
||||
//
|
||||
|
||||
topic_name = "topotest2.perm.topic";
|
||||
LOG.trace("Removing existing Topic");
|
||||
removeTopic(conn, topic_name);
|
||||
LOG.trace("Creating Topic, " + topic_name);
|
||||
cons_dest = sess.createTopic(topic_name);
|
||||
|
||||
testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
|
||||
|
||||
|
||||
//
|
||||
// Cleanup
|
||||
//
|
||||
|
||||
removeTopic(conn, topic_name);
|
||||
sess.close();
|
||||
conn.close();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* TEST TEMPORARY QUEUES
|
||||
*/
|
||||
public void testTempQueue (String prod_broker_url, String cons_broker_url)
|
||||
throws Exception
|
||||
{
|
||||
int echo_id;
|
||||
int num_msg;
|
||||
|
||||
Connection conn;
|
||||
Session sess;
|
||||
|
||||
Destination cons_dest;
|
||||
|
||||
num_msg = 5;
|
||||
|
||||
LOG.info("TESTING TEMP QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
|
||||
" messages)");
|
||||
|
||||
|
||||
//
|
||||
// Connect to the bus.
|
||||
//
|
||||
|
||||
conn = createConnection(cons_broker_url);
|
||||
conn.start();
|
||||
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
|
||||
//
|
||||
// Create the destination on which messages are being tested.
|
||||
//
|
||||
|
||||
LOG.trace("Creating destination");
|
||||
cons_dest = sess.createTemporaryQueue();
|
||||
|
||||
testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
|
||||
|
||||
|
||||
//
|
||||
// Cleanup
|
||||
//
|
||||
|
||||
sess.close();
|
||||
conn.close();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* TEST QUEUES
|
||||
*/
|
||||
public void testQueue (String prod_broker_url, String cons_broker_url)
|
||||
throws Exception
|
||||
{
|
||||
int num_msg;
|
||||
|
||||
Connection conn;
|
||||
Session sess;
|
||||
String queue_name;
|
||||
|
||||
Destination cons_dest;
|
||||
|
||||
num_msg = 5;
|
||||
|
||||
LOG.info("TESTING QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
|
||||
" messages)");
|
||||
|
||||
|
||||
//
|
||||
// Connect to the bus.
|
||||
//
|
||||
|
||||
conn = createConnection(cons_broker_url);
|
||||
conn.start();
|
||||
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
|
||||
//
|
||||
// Create the destination on which messages are being tested.
|
||||
//
|
||||
|
||||
queue_name = "topotest2.perm.queue";
|
||||
LOG.trace("Removing existing Queue");
|
||||
removeQueue(conn, queue_name);
|
||||
LOG.trace("Creating Queue, " + queue_name);
|
||||
cons_dest = sess.createQueue(queue_name);
|
||||
|
||||
testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
|
||||
|
||||
|
||||
//
|
||||
// Cleanup
|
||||
//
|
||||
|
||||
removeQueue(conn, queue_name);
|
||||
sess.close();
|
||||
conn.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void run ()
|
||||
throws Exception
|
||||
{
|
||||
Thread start1;
|
||||
Thread start2;
|
||||
|
||||
testError = false;
|
||||
|
||||
// Use threads to avoid startup deadlock since the first broker started waits until
|
||||
// it knows the name of the remote broker before finishing its startup, which means
|
||||
// the remote must already be running.
|
||||
|
||||
start1 = new Thread() {
|
||||
public void run()
|
||||
{
|
||||
try {
|
||||
broker1.start();
|
||||
} catch (Exception ex) {
|
||||
LOG.error(null, ex);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
start2 = new Thread() {
|
||||
public void run()
|
||||
{
|
||||
try {
|
||||
broker2.start();
|
||||
} catch (Exception ex) {
|
||||
LOG.error(null, ex);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
start1.start();
|
||||
start2.start();
|
||||
|
||||
start1.join();
|
||||
start2.join();
|
||||
|
||||
if ( ! testError )
|
||||
this.testTempTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
|
||||
|
||||
if ( ! testError )
|
||||
this.testTempQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
|
||||
|
||||
if ( ! testError )
|
||||
this.testTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
|
||||
|
||||
if ( ! testError )
|
||||
this.testQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
|
||||
|
||||
Thread.sleep(100);
|
||||
|
||||
shutdown();
|
||||
|
||||
assertTrue(! testError);
|
||||
}
|
||||
|
||||
public void shutdown ()
|
||||
throws Exception
|
||||
{
|
||||
broker1.stop();
|
||||
broker2.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param args the command line arguments
|
||||
*/
|
||||
public static void main(String[] args)
|
||||
{
|
||||
AMQ3274Test main_obj;
|
||||
|
||||
try
|
||||
{
|
||||
main_obj = new AMQ3274Test();
|
||||
main_obj.run();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
ex.printStackTrace();
|
||||
|
||||
LOG.error(null, ex);
|
||||
|
||||
System.exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
protected Connection createConnection (String url)
|
||||
throws Exception
|
||||
{
|
||||
return org.apache.activemq.ActiveMQConnection.makeConnection(url);
|
||||
}
|
||||
|
||||
protected static void removeQueue (Connection conn, String dest_name)
|
||||
throws java.lang.Exception
|
||||
{
|
||||
org.apache.activemq.command.ActiveMQDestination dest;
|
||||
|
||||
if ( conn instanceof org.apache.activemq.ActiveMQConnection )
|
||||
{
|
||||
dest = org.apache.activemq.command.ActiveMQDestination.
|
||||
createDestination(dest_name, (byte) org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE);
|
||||
((org.apache.activemq.ActiveMQConnection)conn).destroyDestination(dest);
|
||||
}
|
||||
}
|
||||
|
||||
protected static void removeTopic (Connection conn, String dest_name)
|
||||
throws java.lang.Exception
|
||||
{
|
||||
org.apache.activemq.command.ActiveMQDestination dest;
|
||||
|
||||
if ( conn instanceof org.apache.activemq.ActiveMQConnection )
|
||||
{
|
||||
dest = org.apache.activemq.command.ActiveMQDestination.
|
||||
createDestination(dest_name, (byte) org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE);
|
||||
((org.apache.activemq.ActiveMQConnection)conn).destroyDestination(dest);
|
||||
}
|
||||
}
|
||||
|
||||
public static String fmtMsgInfo (Message msg)
|
||||
throws Exception
|
||||
{
|
||||
StringBuilder msg_desc;
|
||||
String prop;
|
||||
Enumeration prop_enum;
|
||||
|
||||
msg_desc = new StringBuilder();
|
||||
msg_desc = new StringBuilder();
|
||||
|
||||
if ( msg instanceof TextMessage )
|
||||
{
|
||||
msg_desc.append(((TextMessage) msg).getText());
|
||||
}
|
||||
else
|
||||
{
|
||||
msg_desc.append("[");
|
||||
msg_desc.append(msg.getClass().getName());
|
||||
msg_desc.append("]");
|
||||
}
|
||||
|
||||
prop_enum = msg.getPropertyNames();
|
||||
while ( prop_enum.hasMoreElements() )
|
||||
{
|
||||
prop = (String) prop_enum.nextElement();
|
||||
msg_desc.append("; ");
|
||||
msg_desc.append(prop);
|
||||
msg_desc.append("=");
|
||||
msg_desc.append(msg.getStringProperty(prop));
|
||||
}
|
||||
|
||||
return msg_desc.toString();
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
///////////////////////////////////////////////// INTERNAL CLASSES /////////////////////////////////////////////////
|
||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
protected class EmbeddedTcpBroker
|
||||
{
|
||||
protected BrokerService brokerSvc;
|
||||
protected int brokerNum;
|
||||
protected String brokerName;
|
||||
protected String brokerId;
|
||||
protected int port;
|
||||
protected String tcpUrl;
|
||||
|
||||
public EmbeddedTcpBroker ()
|
||||
throws Exception
|
||||
{
|
||||
brokerSvc = new BrokerService();
|
||||
|
||||
synchronized ( this.getClass() )
|
||||
{
|
||||
brokerNum = Next_broker_num;
|
||||
Next_broker_num++;
|
||||
}
|
||||
|
||||
brokerName = "broker" + brokerNum;
|
||||
brokerId = "b" + brokerNum;
|
||||
|
||||
brokerSvc.setBrokerName(brokerName);
|
||||
brokerSvc.setBrokerId(brokerId);
|
||||
|
||||
brokerSvc.setPersistent(false);
|
||||
brokerSvc.setUseJmx(false); // TBD
|
||||
|
||||
port = 60000 + ( brokerNum * 10 );
|
||||
|
||||
// Configure the transport connector (TCP)
|
||||
tcpUrl = "tcp://127.0.0.1:" + Integer.toString(port);
|
||||
brokerSvc.addConnector(tcpUrl);
|
||||
}
|
||||
|
||||
public Connection createConnection ()
|
||||
throws URISyntaxException, JMSException
|
||||
{
|
||||
Connection result;
|
||||
|
||||
result = org.apache.activemq.ActiveMQConnection.makeConnection(this.tcpUrl);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public String getConnectionUrl ()
|
||||
{
|
||||
return this.tcpUrl;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create network connections to the given broker using the network-connector
|
||||
* configuration of CORE brokers (e.g. core1.bus.dev1.coresys.tmcs)
|
||||
*
|
||||
* @param other
|
||||
* @param duplex_f
|
||||
*/
|
||||
public void coreConnectTo (EmbeddedTcpBroker other, boolean duplex_f)
|
||||
throws Exception
|
||||
{
|
||||
this.makeConnectionTo(other, duplex_f, true);
|
||||
this.makeConnectionTo(other, duplex_f, false);
|
||||
}
|
||||
|
||||
public void start ()
|
||||
throws Exception
|
||||
{
|
||||
brokerSvc.start();
|
||||
//brokerSvc.waitUntilStarted();
|
||||
}
|
||||
|
||||
public void stop ()
|
||||
throws Exception
|
||||
{
|
||||
brokerSvc.stop();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Make one connection to the other embedded broker, of the specified type (queue or topic)
|
||||
* using the standard CORE broker networking.
|
||||
*
|
||||
* @param other
|
||||
* @param duplex_f
|
||||
* @param queue_f
|
||||
* @throws Exception
|
||||
*/
|
||||
protected void makeConnectionTo (EmbeddedTcpBroker other, boolean duplex_f, boolean queue_f)
|
||||
throws Exception
|
||||
{
|
||||
NetworkConnector nw_conn;
|
||||
String prefix;
|
||||
ActiveMQDestination excl_dest;
|
||||
ArrayList excludes;
|
||||
|
||||
nw_conn = new DiscoveryNetworkConnector(new URI("static:(" + other.tcpUrl + ")"));
|
||||
nw_conn.setDuplex(duplex_f);
|
||||
|
||||
if ( queue_f )
|
||||
nw_conn.setConduitSubscriptions(false);
|
||||
else
|
||||
nw_conn.setConduitSubscriptions(true);
|
||||
|
||||
nw_conn.setNetworkTTL(5);
|
||||
nw_conn.setSuppressDuplicateQueueSubscriptions(true);
|
||||
nw_conn.setDecreaseNetworkConsumerPriority(true);
|
||||
nw_conn.setBridgeTempDestinations(true);
|
||||
|
||||
if ( queue_f )
|
||||
{
|
||||
prefix = "queue";
|
||||
excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE);
|
||||
}
|
||||
else
|
||||
{
|
||||
prefix = "topic";
|
||||
excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE);
|
||||
}
|
||||
|
||||
excludes = new ArrayList();
|
||||
excludes.add(excl_dest);
|
||||
nw_conn.setExcludedDestinations(excludes);
|
||||
|
||||
if ( duplex_f )
|
||||
nw_conn.setName(this.brokerId + "<-" + prefix + "->" + other.brokerId);
|
||||
else
|
||||
nw_conn.setName(this.brokerId + "-" + prefix + "->" + other.brokerId);
|
||||
|
||||
brokerSvc.addNetworkConnector(nw_conn);
|
||||
}
|
||||
}
|
||||
|
||||
protected class MessageClient extends java.lang.Thread
|
||||
{
|
||||
protected MessageConsumer msgCons;
|
||||
protected boolean shutdownInd;
|
||||
protected int expectedCount;
|
||||
protected int lastSeq = 0;
|
||||
protected int msgCount = 0;
|
||||
protected boolean haveFirstSeq;
|
||||
protected CountDownLatch shutdownLatch;
|
||||
|
||||
public MessageClient (MessageConsumer cons, int num_to_expect)
|
||||
{
|
||||
msgCons = cons;
|
||||
expectedCount = ( num_to_expect * ( echoResponseFill + 1 ) );
|
||||
shutdownLatch = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
public void run ()
|
||||
{
|
||||
CountDownLatch latch;
|
||||
|
||||
try
|
||||
{
|
||||
synchronized ( this )
|
||||
{
|
||||
latch = shutdownLatch;
|
||||
}
|
||||
|
||||
shutdownInd = false;
|
||||
processMessages();
|
||||
|
||||
latch.countDown();
|
||||
}
|
||||
catch ( Exception exc )
|
||||
{
|
||||
LOG.error("message client error", exc);
|
||||
}
|
||||
}
|
||||
|
||||
public void waitShutdown (long timeout)
|
||||
{
|
||||
CountDownLatch latch;
|
||||
|
||||
try
|
||||
{
|
||||
synchronized ( this )
|
||||
{
|
||||
latch = shutdownLatch;
|
||||
}
|
||||
|
||||
if ( latch != null )
|
||||
latch.await(timeout, TimeUnit.MILLISECONDS);
|
||||
else
|
||||
LOG.info("echo client shutdown: client does not appear to be active");
|
||||
}
|
||||
catch ( InterruptedException int_exc )
|
||||
{
|
||||
LOG.warn("wait for message client shutdown interrupted", int_exc);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean shutdown ()
|
||||
{
|
||||
boolean down_ind;
|
||||
|
||||
if ( ! shutdownInd )
|
||||
{
|
||||
shutdownInd = true;
|
||||
}
|
||||
|
||||
waitShutdown(200);
|
||||
|
||||
synchronized ( this )
|
||||
{
|
||||
if ( ( shutdownLatch == null ) || ( shutdownLatch.getCount() == 0 ) )
|
||||
down_ind = true;
|
||||
else
|
||||
down_ind = false;
|
||||
}
|
||||
|
||||
return down_ind;
|
||||
}
|
||||
|
||||
public int getNumMsgReceived ()
|
||||
{
|
||||
return msgCount;
|
||||
}
|
||||
|
||||
protected void processMessages ()
|
||||
throws Exception
|
||||
{
|
||||
Message in_msg;
|
||||
|
||||
haveFirstSeq = false;
|
||||
|
||||
//
|
||||
// Stop at shutdown time or after any test error is detected.
|
||||
//
|
||||
|
||||
while ( ( ! shutdownInd ) && ( ! testError ) )
|
||||
{
|
||||
in_msg = msgCons.receive(100);
|
||||
|
||||
if ( in_msg != null )
|
||||
{
|
||||
msgCount++;
|
||||
checkMessage(in_msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void checkMessage (Message in_msg)
|
||||
throws Exception
|
||||
{
|
||||
int seq;
|
||||
|
||||
LOG.debug("received message " + fmtMsgInfo(in_msg));
|
||||
|
||||
//
|
||||
// Only check messages with a sequence number.
|
||||
//
|
||||
|
||||
if ( in_msg.propertyExists("SEQ") )
|
||||
{
|
||||
seq = in_msg.getIntProperty("SEQ");
|
||||
|
||||
if ( ( haveFirstSeq ) && ( seq != ( lastSeq + 1 ) ) )
|
||||
{
|
||||
LOG.error("***ERROR*** incorrect sequence number; expected " +
|
||||
Integer.toString(lastSeq + 1) + " but have " +
|
||||
Integer.toString(seq));
|
||||
|
||||
testError = true;
|
||||
}
|
||||
|
||||
lastSeq = seq;
|
||||
|
||||
if ( msgCount > expectedCount )
|
||||
{
|
||||
LOG.warn("*** have more messages than expected; have " + msgCount +
|
||||
"; expect " + expectedCount);
|
||||
|
||||
testError = true;
|
||||
}
|
||||
}
|
||||
|
||||
if ( in_msg.propertyExists("end-of-response") )
|
||||
{
|
||||
LOG.trace("received end-of-response message");
|
||||
shutdownInd = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
protected class EchoService extends java.lang.Thread
|
||||
{
|
||||
protected String destName;
|
||||
protected Connection jmsConn;
|
||||
protected Session sess;
|
||||
protected MessageConsumer msg_cons;
|
||||
protected boolean Shutdown_ind;
|
||||
|
||||
protected Destination req_dest;
|
||||
protected Destination resp_dest;
|
||||
protected MessageProducer msg_prod;
|
||||
|
||||
protected CountDownLatch waitShutdown;
|
||||
|
||||
public EchoService (String dest, Connection broker_conn)
|
||||
throws Exception
|
||||
{
|
||||
destName = dest;
|
||||
jmsConn = broker_conn;
|
||||
|
||||
Shutdown_ind = false;
|
||||
|
||||
sess = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
req_dest = sess.createQueue(destName);
|
||||
msg_cons = sess.createConsumer(req_dest);
|
||||
|
||||
jmsConn.start();
|
||||
|
||||
waitShutdown = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
public EchoService (String dest, String broker_url)
|
||||
throws Exception
|
||||
{
|
||||
this(dest, ActiveMQConnection.makeConnection(broker_url));
|
||||
}
|
||||
|
||||
public void run ()
|
||||
{
|
||||
Message req;
|
||||
|
||||
try
|
||||
{
|
||||
LOG.info("STARTING ECHO SERVICE");
|
||||
|
||||
while ( ! Shutdown_ind )
|
||||
{
|
||||
req = msg_cons.receive(100);
|
||||
if ( req != null )
|
||||
{
|
||||
if ( LOG.isDebugEnabled() )
|
||||
LOG.debug("ECHO request message " + req.toString());
|
||||
|
||||
resp_dest = req.getJMSReplyTo();
|
||||
if ( resp_dest != null )
|
||||
{
|
||||
msg_prod = sess.createProducer(resp_dest);
|
||||
msg_prod.send(req);
|
||||
msg_prod.close();
|
||||
msg_prod = null;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG.warn("invalid request: no reply-to destination given");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LOG.error(null, ex);
|
||||
}
|
||||
finally
|
||||
{
|
||||
LOG.info("shutting down test echo service");
|
||||
|
||||
try
|
||||
{
|
||||
jmsConn.stop();
|
||||
}
|
||||
catch ( javax.jms.JMSException jms_exc )
|
||||
{
|
||||
LOG.warn("error on shutting down JMS connection", jms_exc);
|
||||
}
|
||||
|
||||
synchronized ( this )
|
||||
{
|
||||
waitShutdown.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Shut down the service, waiting up to 3 seconds for the service to terminate.
|
||||
*/
|
||||
public void shutdown ()
|
||||
{
|
||||
CountDownLatch wait_l;
|
||||
|
||||
synchronized ( this )
|
||||
{
|
||||
wait_l = waitShutdown;
|
||||
}
|
||||
|
||||
Shutdown_ind = true;
|
||||
|
||||
try
|
||||
{
|
||||
if ( wait_l != null )
|
||||
{
|
||||
if ( wait_l.await(3000, TimeUnit.MILLISECONDS) )
|
||||
LOG.info("echo service shutdown complete");
|
||||
else
|
||||
LOG.warn("timeout waiting for echo service shutdown");
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG.info("echo service shutdown: service does not appear to be active");
|
||||
}
|
||||
}
|
||||
catch ( InterruptedException int_exc )
|
||||
{
|
||||
LOG.warn("interrupted while waiting for echo service shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue