This commit is contained in:
Darwin G. Flores 2006-03-18 04:31:17 +00:00
parent 351decc105
commit b182afbe46
7 changed files with 34 additions and 17 deletions

View File

@ -107,7 +107,7 @@ public class JmsSendReceiveTestSupport extends TestSupport implements MessageLis
for (int i = 0; i < data.length; i++) {
Message message = session.createTextMessage(data[i]);
Thread.sleep(200);
if (verbose) {
log.info("About to send a message: " + message + " with text: " + data[i]);
}

View File

@ -321,8 +321,6 @@
<include>**/*Test.*</include>
</includes>
<excludes>
<!-- http://jira.activemq.org/jira/browse/AMQ-586: fails on Windows 2003 -->
<exclude>**/TwoBrokerTopicSendReceiveTest.*</exclude>
<!-- http://jira.activemq.org/jira/browse/AMQ-537 -->
<exclude>**/PublishOnQueueConsumedMessageUsingActivemqXMLTest.*</exclude>
<!-- http://jira.activemq.org/jira/browse/AMQ-538 -->

View File

@ -1,4 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2005-2006 The Apache Software Foundation

View File

@ -137,7 +137,6 @@ public class Consumer extends Sampler implements MessageListener {
if ((ServerConnectionFactory.JORAM_SERVER.equals(this.getMQServer())) ||
(ServerConnectionFactory.MANTARAY_SERVER.equals(this.getMQServer()))) {
//Id set by server
} else {
IdGenerator idGenerator = new IdGenerator();
connection.setClientID(idGenerator.generateId());
@ -145,8 +144,7 @@ public class Consumer extends Sampler implements MessageListener {
}
//start connection before receiving messages.
connection.start();
// connection.start();
Session session = ServerConnectionFactory.createSession(connection,
this.getTransacted(),
this.getMQServer(),
@ -159,6 +157,7 @@ public class Consumer extends Sampler implements MessageListener {
this.getTopic());
MessageConsumer consumer = null;
connection.start();
if (ServerConnectionFactory.OPENJMS_SERVER.equals(this.getMQServer())) {
if (this.getTopic()) {
@ -214,7 +213,6 @@ public class Consumer extends Sampler implements MessageListener {
try {
TextMessage textMessage = (TextMessage) message;
Session session;
// lets force the content to be deserialized
String text = textMessage.getText();
count(1);

View File

@ -172,7 +172,6 @@ public class ConsumerSysTest extends Sampler implements MessageListener {
ACTIVEMQ_SERVER,
this.getTopic());
if (this.getDurable() && this.getTopic()) {
consumer = session.createDurableSubscriber((Topic) destination, getClass().getName());
} else {
@ -191,10 +190,12 @@ public class ConsumerSysTest extends Sampler implements MessageListener {
protected void publishConfirmMessage() throws JMSException {
MessageProducer publisher = null;
String text = PUBLISH_MSG;
Connection connection = ServerConnectionFactory.createConnectionFactory(this.getURL(),
ACTIVEMQ_SERVER,
this.getTopic(),
this.getEmbeddedBroker());
if (this.getDurable()) {
IdGenerator idGenerator = new IdGenerator();
connection.setClientID(idGenerator.generateId());
@ -247,13 +248,10 @@ public class ConsumerSysTest extends Sampler implements MessageListener {
* @throws JMSException
*/
public void run() throws JMSException {
// Receives the config message
suscribeConfigMessage();
// Create subscriber
subscribe();
// Publish confirm messages
publishConfirmMessage();
}

View File

@ -278,7 +278,6 @@ public class Producer extends Sampler implements TestListener {
* @throws Exception
*/
public void run() throws Exception {
start();
publish();
}

View File

@ -94,6 +94,10 @@ public class ServerConnectionFactory {
private static int mantarayProducerPortCount = 0;
private static int mantarayConsumerPortCount = 0;
protected static String user = ActiveMQConnection.DEFAULT_USER;
protected static String pwd = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* Closes the connection passed through the parameter
*
@ -199,9 +203,8 @@ public class ServerConnectionFactory {
} else {
//Used to create a session from the default MQ server ActiveMQConnectionFactory.
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
factory.setUseAsyncSend(true);
ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
factory.setUseAsyncSend(true);
c.getPrefetchPolicy().setQueuePrefetch(1000);
c.getPrefetchPolicy().setQueueBrowserPrefetch(1000);
@ -276,6 +279,7 @@ public class ServerConnectionFactory {
boolean isTransacted,
String mqServer,
boolean isTopic) throws JMSException {
if (OPENJMS_SERVER.equals(mqServer) || MANTARAY_SERVER.equals(mqServer)) {
if (isTransacted) {
if (isTopic) {
@ -305,9 +309,29 @@ public class ServerConnectionFactory {
} else {
// check when to use Transacted or Non-Transacted type.
if (isTransacted) {
return connection.createSession(true, Session.SESSION_TRANSACTED);
if (isTopic) {
TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.SESSION_TRANSACTED);
return ((Session) session);
} else {
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSession session = ((QueueConnection) connection).createQueueSession(false, Session.SESSION_TRANSACTED);
return ((Session) session);
}
} else {
if (isTopic) {
TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
return ((Session) session);
} else {
QueueSession session = ((QueueConnection) connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
return ((Session) session);
}
}
}
}