Update to use the QPid JMS client v0.3.0
This commit is contained in:
Timothy Bish 2015-08-05 12:49:21 -04:00
parent 457dbd8b64
commit a79f317d31
3 changed files with 70 additions and 53 deletions

View File

@ -35,8 +35,8 @@
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-client-jms</artifactId>
<version>${qpid-jms-version}</version>
<artifactId>qpid-jms-client</artifactId>
<version>0.3.0</version>
</dependency>
</dependencies>

View File

@ -16,73 +16,83 @@
*/
package example;
import org.apache.qpid.amqp_1_0.jms.impl.*;
import org.apache.qpid.jms.*;
import javax.jms.*;
class Listener {
public static void main(String []args) throws JMSException {
public static void main(String[] args) throws JMSException {
final String TOPIC_PREFIX = "topic://";
String user = env("ACTIVEMQ_USER", "admin");
String password = env("ACTIVEMQ_PASSWORD", "password");
String host = env("ACTIVEMQ_HOST", "localhost");
int port = Integer.parseInt(env("ACTIVEMQ_PORT", "5672"));
String destination = arg(args, 0, "topic://event");
ConnectionFactoryImpl factory = new ConnectionFactoryImpl(host, port, user, password);
Destination dest = null;
if( destination.startsWith("topic://") ) {
dest = new TopicImpl(destination);
} else {
dest = new QueueImpl(destination);
}
String connectionURI = "amqp://" + host + ":" + port;
String destinationName = arg(args, 0, "topic://event");
JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI);
Connection connection = factory.createConnection(user, password);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(dest);
Destination destination = null;
if (destinationName.startsWith(TOPIC_PREFIX)) {
destination = session.createTopic(destinationName.substring(TOPIC_PREFIX.length()));
} else {
destination = session.createQueue(destinationName);
}
MessageConsumer consumer = session.createConsumer(destination);
long start = System.currentTimeMillis();
long count = 1;
System.out.println("Waiting for messages...");
while(true) {
while (true) {
Message msg = consumer.receive();
if( msg instanceof TextMessage ) {
if (msg instanceof TextMessage) {
String body = ((TextMessage) msg).getText();
if( "SHUTDOWN".equals(body)) {
if ("SHUTDOWN".equals(body)) {
long diff = System.currentTimeMillis() - start;
System.out.println(String.format("Received %d in %.2f seconds", count, (1.0*diff/1000.0)));
System.out.println(String.format("Received %d in %.2f seconds", count, (1.0 * diff / 1000.0)));
connection.close();
try {
Thread.sleep(10);
} catch (Exception e) {}
System.exit(1);
} else {
try {
if( count != msg.getIntProperty("id") ) {
System.out.println("mismatch: "+count+"!="+msg.getIntProperty("id"));
if (count != msg.getIntProperty("id")) {
System.out.println("mismatch: " + count + "!=" + msg.getIntProperty("id"));
}
} catch (NumberFormatException ignore) {
}
if( count == 1 ) {
if (count == 1) {
start = System.currentTimeMillis();
} else if( count % 1000 == 0 ) {
} else if (count % 1000 == 0) {
System.out.println(String.format("Received %d messages.", count));
}
count ++;
count++;
}
} else {
System.out.println("Unexpected message type: "+msg.getClass());
System.out.println("Unexpected message type: " + msg.getClass());
}
}
}
private static String env(String key, String defaultValue) {
String rc = System.getenv(key);
if( rc== null )
if (rc == null)
return defaultValue;
return rc;
}
private static String arg(String []args, int index, String defaultValue) {
if( index < args.length )
private static String arg(String[] args, int index, String defaultValue) {
if (index < args.length)
return args[index];
else
return defaultValue;

View File

@ -16,66 +16,73 @@
*/
package example;
import org.apache.qpid.amqp_1_0.jms.impl.*;
import org.apache.qpid.jms.*;
import javax.jms.*;
class Publisher {
public static void main(String []args) throws Exception {
public static void main(String[] args) throws Exception {
final String TOPIC_PREFIX = "topic://";
String user = env("ACTIVEMQ_USER", "admin");
String password = env("ACTIVEMQ_PASSWORD", "password");
String host = env("ACTIVEMQ_HOST", "localhost");
int port = Integer.parseInt(env("ACTIVEMQ_PORT", "5672"));
String destination = arg(args, 0, "topic://event");
String connectionURI = "amqp://" + host + ":" + port;
String destinationName = arg(args, 0, "topic://event");
int messages = 10000;
int size = 256;
String DATA = "abcdefghijklmnopqrstuvwxyz";
String body = "";
for( int i=0; i < size; i ++) {
body += DATA.charAt(i%DATA.length());
for (int i = 0; i < size; i++) {
body += DATA.charAt(i % DATA.length());
}
ConnectionFactoryImpl factory = new ConnectionFactoryImpl(host, port, user, password);
Destination dest = null;
if( destination.startsWith("topic://") ) {
dest = new TopicImpl(destination);
} else {
dest = new QueueImpl(destination);
}
JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI);
Connection connection = factory.createConnection(user, password);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(dest);
Destination destination = null;
if (destinationName.startsWith(TOPIC_PREFIX)) {
destination = session.createTopic(destinationName.substring(TOPIC_PREFIX.length()));
} else {
destination = session.createQueue(destinationName);
}
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for( int i=1; i <= messages; i ++) {
TextMessage msg = session.createTextMessage("#:"+i);
for (int i = 1; i <= messages; i++) {
TextMessage msg = session.createTextMessage("#:" + i);
msg.setIntProperty("id", i);
producer.send(msg);
if( (i % 1000) == 0) {
if ((i % 1000) == 0) {
System.out.println(String.format("Sent %d messages", i));
}
}
producer.send(session.createTextMessage("SHUTDOWN"));
Thread.sleep(1000*3);
Thread.sleep(1000 * 3);
connection.close();
System.exit(0);
}
private static String env(String key, String defaultValue) {
String rc = System.getenv(key);
if( rc== null )
if (rc == null)
return defaultValue;
return rc;
}
private static String arg(String []args, int index, String defaultValue) {
if( index < args.length )
private static String arg(String[] args, int index, String defaultValue) {
if (index < args.length)
return args[index];
else
return defaultValue;