mirror of https://github.com/apache/activemq.git
test case to check sending and receiving of messages inside a transaction (http://forums.activemq.org/posts/list/364.page)
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@378483 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
43112670da
commit
8b07914259
|
@ -0,0 +1,180 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.usecases;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
|
||||
import javax.jms.*;
|
||||
import java.util.List;
|
||||
import java.util.Collections;
|
||||
import java.util.ArrayList;
|
||||
import java.io.File;
|
||||
|
||||
|
||||
public final class PublishOnQueueConsumedMessageInTransactionTest extends TestCase implements MessageListener {
|
||||
|
||||
private Session producerSession;
|
||||
private Session consumerSession;
|
||||
private Destination queue;
|
||||
private ActiveMQConnectionFactory factory;
|
||||
private MessageProducer producer;
|
||||
private MessageConsumer consumer;
|
||||
private Connection connection;
|
||||
private ObjectMessage objectMessage = null;
|
||||
private List messages = createConcurrentList();
|
||||
private final Object lock = new Object();
|
||||
private String[] data;
|
||||
private String DATAFILE_ROOT = "activemq-data";
|
||||
private int messageCount = 3;
|
||||
private String url = "vm://localhost";
|
||||
|
||||
// Invalid acknowledgment warning can be viewed on the console of a remote broker
|
||||
// The warning message is not thrown back to the client
|
||||
//private String url = "tcp://localhost:61616";
|
||||
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
File dataFile = new File(DATAFILE_ROOT);
|
||||
recursiveDelete(dataFile);
|
||||
try {
|
||||
factory = new ActiveMQConnectionFactory(url);
|
||||
connection = factory.createConnection();
|
||||
producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
queue = new ActiveMQQueue("FOO.BAR");
|
||||
data = new String[messageCount];
|
||||
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
data[i] = "Message : " + i;
|
||||
}
|
||||
} catch (JMSException je) {
|
||||
fail("Error setting up connection : " + je.toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void testSendReceive() throws Exception {
|
||||
sendMessage();
|
||||
|
||||
connection.start();
|
||||
consumer = consumerSession.createConsumer(queue);
|
||||
consumer.setMessageListener(this);
|
||||
waitForMessagesToBeDelivered();
|
||||
assertEquals("Messages received doesn't equal messages sent", messages.size(),data.length);
|
||||
|
||||
}
|
||||
|
||||
|
||||
protected void sendMessage() throws JMSException {
|
||||
messages.clear();
|
||||
try {
|
||||
for (int i = 0; i < data.length; ++i) {
|
||||
producer = producerSession.createProducer(queue);
|
||||
objectMessage = producerSession.createObjectMessage(data[i]);
|
||||
producer.send(objectMessage);
|
||||
producerSession.commit();
|
||||
System.out.println("sending message :" + objectMessage);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (producerSession != null) {
|
||||
producerSession.rollback();
|
||||
System.out.println("rollback");
|
||||
producerSession.close();
|
||||
}
|
||||
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public synchronized void onMessage(Message m) {
|
||||
try {
|
||||
objectMessage = (ObjectMessage) m;
|
||||
consumeMessage(objectMessage,messages);
|
||||
|
||||
System.out.println("consumer received message :" + objectMessage);
|
||||
consumerSession.commit();
|
||||
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
consumerSession.rollback();
|
||||
System.out.println("rolled back transaction");
|
||||
} catch (JMSException e1) {
|
||||
System.out.println(e1);
|
||||
e1.printStackTrace();
|
||||
}
|
||||
System.out.println(e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected void consumeMessage(Message message, List messageList) {
|
||||
messageList.add(message);
|
||||
if (messageList.size() >= data.length) {
|
||||
synchronized (lock) {
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
protected List createConcurrentList() {
|
||||
return Collections.synchronizedList(new ArrayList());
|
||||
}
|
||||
|
||||
|
||||
protected void waitForMessagesToBeDelivered() {
|
||||
long maxWaitTime = 5000;
|
||||
long waitTime = maxWaitTime;
|
||||
long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
|
||||
|
||||
synchronized (lock) {
|
||||
while (messages.size() <= data.length && waitTime >= 0) {
|
||||
try {
|
||||
lock.wait(200);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
waitTime = maxWaitTime - (System.currentTimeMillis() - start);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected static void recursiveDelete(File file) {
|
||||
if( file.isDirectory() ) {
|
||||
File[] files = file.listFiles();
|
||||
for (int i = 0; i < files.length; i++) {
|
||||
recursiveDelete(files[i]);
|
||||
}
|
||||
}
|
||||
file.delete();
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
|
||||
super.tearDown();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue