mirror of https://github.com/apache/activemq.git
added test case to simulate "javax.jms.JMSException: Transaction 'TX:ID:...' has not been started." exception
this test appears to manifest consistently on a MacBook. Haven't been able to reproduce this on windows though. Is excluded by default as the test can sometime take too long to execute git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@564505 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fc00993839
commit
1963ea4006
|
@ -309,6 +309,7 @@
|
|||
<excludes>
|
||||
|
||||
<!-- These tests run too slow to execute as part of the unit tests -->
|
||||
<exclude>**/TransactionNotStartedErrorTest.*</exclude>
|
||||
<exclude>**/DefaultStoreBrokerTest.*</exclude>
|
||||
<exclude>**/TcpTransportBrokerTest.*</exclude>
|
||||
<exclude>**/activeio/*</exclude>
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
package org.apache.activemq.bugs;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Session;
|
||||
|
||||
public class MessageSender {
|
||||
private MessageProducer producer;
|
||||
private Session session;
|
||||
|
||||
public MessageSender(String queueName,Connection connection, boolean useTransactedSession) throws Exception {
|
||||
session = useTransactedSession ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
producer = session.createProducer(session.createQueue(queueName));
|
||||
}
|
||||
|
||||
public void send(String payload) throws Exception {
|
||||
ObjectMessage message = session.createObjectMessage();
|
||||
message.setObject(payload);
|
||||
producer.send(message);
|
||||
if (session.getTransacted()) {
|
||||
session.commit();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
package org.apache.activemq.bugs;
|
||||
|
||||
public interface Receiver {
|
||||
public void receive(String s) throws Exception;
|
||||
}
|
|
@ -0,0 +1,303 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.bugs;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageListener;
|
||||
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Session;
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.bugs.Receiver;
|
||||
import org.apache.activemq.bugs.MessageSender;
|
||||
|
||||
/*
|
||||
* simulate message flow which cause the following exception
|
||||
* in the broker (exception logged by client)
|
||||
* <p/>
|
||||
* 2007-07-24 13:51:23,624 com.easynet.halo.Halo ERROR (LoggingErrorHandler.java: 23) JMS failure
|
||||
* javax.jms.JMSException: Transaction 'TX:ID:dmt-53625-1185281414694-1:0:344' has not been started.
|
||||
* at org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:230)
|
||||
*
|
||||
*
|
||||
* This appears to be consistent in a MacBook. Haven't been able to replicate it on Windows though
|
||||
*/
|
||||
public class TransactionNotStartedErrorTest extends TestCase {
|
||||
|
||||
private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
|
||||
.getLog(TransactionNotStartedErrorTest.class);
|
||||
private String hectorToHalo = "hectorToHalo";
|
||||
private String xenaToHalo = "xenaToHalo";
|
||||
private String troyToHalo = "troyToHalo";
|
||||
|
||||
private String haloToHector = "haloToHector";
|
||||
private String haloToXena = "haloToXena";
|
||||
private String haloToTroy = "haloToTroy";
|
||||
|
||||
private static int counter = 500;
|
||||
|
||||
private static int hectorToHaloCtr = 0;
|
||||
private static int xenaToHaloCtr = 0;
|
||||
private static int troyToHaloCtr = 0;
|
||||
|
||||
private static int haloToHectorCtr = 0;
|
||||
private static int haloToXenaCtr = 0;
|
||||
private static int haloToTroyCtr = 0;
|
||||
|
||||
private BrokerService broker;
|
||||
|
||||
private Connection hectorConnection;
|
||||
private Connection xenaConnection;
|
||||
private Connection troyConnection;
|
||||
private Connection haloConnection;
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
||||
public Connection createConnection() throws JMSException {
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
|
||||
"tcp://localhost:61616");
|
||||
return factory.createConnection();
|
||||
}
|
||||
|
||||
public Session createSession(Connection connection, boolean transacted)
|
||||
throws JMSException {
|
||||
return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
|
||||
public void startBroker() throws Exception {
|
||||
broker = new BrokerService();
|
||||
broker.setDeleteAllMessagesOnStartup(true);
|
||||
broker.setPersistent(true);
|
||||
broker.setUseJmx(true);
|
||||
broker.addConnector("tcp://localhost:61616").setName("Default");
|
||||
broker.start();
|
||||
log.info("Starting broker..");
|
||||
}
|
||||
|
||||
public void tearDown() throws Exception {
|
||||
hectorConnection.close();
|
||||
xenaConnection.close();
|
||||
troyConnection.close();
|
||||
haloConnection.close();
|
||||
broker.stop();
|
||||
}
|
||||
|
||||
public void testTransactionNotStartedError() throws Exception {
|
||||
startBroker();
|
||||
hectorConnection = createConnection();
|
||||
Thread hectorThread = buildProducer(hectorConnection, hectorToHalo);
|
||||
Receiver hHectorReceiver = new Receiver() {
|
||||
public void receive(String s) throws Exception {
|
||||
haloToHectorCtr++;
|
||||
if (haloToHectorCtr >= counter) {
|
||||
synchronized (lock) {
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver);
|
||||
|
||||
troyConnection = createConnection();
|
||||
Thread troyThread = buildProducer(troyConnection, troyToHalo);
|
||||
Receiver hTroyReceiver = new Receiver() {
|
||||
public void receive(String s) throws Exception {
|
||||
haloToTroyCtr++;
|
||||
if (haloToTroyCtr >= counter) {
|
||||
synchronized (lock) {
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver);
|
||||
|
||||
xenaConnection = createConnection();
|
||||
Thread xenaThread = buildProducer(xenaConnection, xenaToHalo);
|
||||
Receiver hXenaReceiver = new Receiver() {
|
||||
public void receive(String s) throws Exception {
|
||||
haloToXenaCtr++;
|
||||
if (haloToXenaCtr >= counter) {
|
||||
synchronized (lock) {
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver);
|
||||
|
||||
haloConnection = createConnection();
|
||||
final MessageSender hectorSender = buildTransactionalProducer(
|
||||
haloToHector, haloConnection);
|
||||
final MessageSender troySender = buildTransactionalProducer(haloToTroy,
|
||||
haloConnection);
|
||||
final MessageSender xenaSender = buildTransactionalProducer(haloToXena,
|
||||
haloConnection);
|
||||
Receiver hectorReceiver = new Receiver() {
|
||||
public void receive(String s) throws Exception {
|
||||
hectorToHaloCtr++;
|
||||
troySender.send("halo to troy because of hector");
|
||||
if (hectorToHaloCtr >= counter) {
|
||||
synchronized (lock) {
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
Receiver xenaReceiver = new Receiver() {
|
||||
public void receive(String s) throws Exception {
|
||||
xenaToHaloCtr++;
|
||||
hectorSender.send("halo to hector because of xena");
|
||||
if (xenaToHaloCtr >= counter) {
|
||||
synchronized (lock) {
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
Receiver troyReceiver = new Receiver() {
|
||||
public void receive(String s) throws Exception {
|
||||
troyToHaloCtr++;
|
||||
xenaSender.send("halo to xena because of troy");
|
||||
if (troyToHaloCtr >= counter) {
|
||||
synchronized (lock) {
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver);
|
||||
buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver);
|
||||
buildReceiver(haloConnection, troyToHalo, true, troyReceiver);
|
||||
|
||||
haloConnection.start();
|
||||
|
||||
troyConnection.start();
|
||||
troyThread.start();
|
||||
|
||||
xenaConnection.start();
|
||||
xenaThread.start();
|
||||
|
||||
hectorConnection.start();
|
||||
hectorThread.start();
|
||||
waitForMessagesToBeDelivered();
|
||||
//number of messages received should match messages sent
|
||||
assertEquals(hectorToHaloCtr, counter);
|
||||
log.info("hectorToHalo received " + hectorToHaloCtr + " messages");
|
||||
assertEquals(xenaToHaloCtr, counter);
|
||||
log.info("xenaToHalo received " + xenaToHaloCtr + " messages");
|
||||
assertEquals(troyToHaloCtr, counter);
|
||||
log.info("troyToHalo received " + troyToHaloCtr + " messages");
|
||||
assertEquals(haloToHectorCtr, counter);
|
||||
log.info("haloToHector received " + haloToHectorCtr + " messages");
|
||||
assertEquals(haloToXenaCtr, counter);
|
||||
log.info("haloToXena received " + haloToXenaCtr + " messages");
|
||||
assertEquals(haloToTroyCtr, counter);
|
||||
log.info("haloToTroy received " + haloToTroyCtr + " messages");
|
||||
|
||||
}
|
||||
|
||||
protected void waitForMessagesToBeDelivered() {
|
||||
// let's give the listeners enough time to read all messages
|
||||
long maxWaitTime = counter * 3000;
|
||||
long waitTime = maxWaitTime;
|
||||
long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
|
||||
|
||||
synchronized (lock) {
|
||||
boolean hasMessages = true;
|
||||
while (hasMessages && waitTime >= 0) {
|
||||
try {
|
||||
lock.wait(200);
|
||||
} catch (InterruptedException e) {
|
||||
log.error(e);
|
||||
}
|
||||
//check if all messages have been received
|
||||
hasMessages = hectorToHaloCtr < counter
|
||||
|| xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter
|
||||
|| haloToXenaCtr < counter || haloToTroyCtr < counter;
|
||||
waitTime = maxWaitTime - (System.currentTimeMillis() - start);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public MessageSender buildTransactionalProducer(String queueName,
|
||||
Connection connection) throws Exception {
|
||||
|
||||
return new MessageSender(queueName, connection, true);
|
||||
}
|
||||
|
||||
public Thread buildProducer(Connection connection, final String queueName)
|
||||
throws Exception {
|
||||
|
||||
final Session session = connection.createSession(false,
|
||||
Session.AUTO_ACKNOWLEDGE);
|
||||
final MessageSender producer = new MessageSender(queueName, connection,
|
||||
false);
|
||||
Thread thread = new Thread() {
|
||||
|
||||
public synchronized void run() {
|
||||
for (int i = 0; i < counter; i++) {
|
||||
try {
|
||||
producer.send(queueName);
|
||||
if (session.getTransacted()) {
|
||||
session.commit();
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("on " + queueName + " send",
|
||||
e);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
return thread;
|
||||
}
|
||||
|
||||
public void buildReceiver(Connection connection, final String queueName,
|
||||
boolean transacted, final Receiver receiver) throws Exception {
|
||||
final Session session = transacted ? connection.createSession(true,
|
||||
Session.SESSION_TRANSACTED) : connection.createSession(false,
|
||||
Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer inputMessageConsumer = session.createConsumer(session
|
||||
.createQueue(queueName));
|
||||
MessageListener messageListener = new MessageListener() {
|
||||
|
||||
public void onMessage(Message message) {
|
||||
try {
|
||||
ObjectMessage objectMessage = (ObjectMessage) message;
|
||||
String s = (String) objectMessage.getObject();
|
||||
receiver.receive(s);
|
||||
if (session.getTransacted()) {
|
||||
session.commit();
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
inputMessageConsumer.setMessageListener(messageListener);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue