mirror of https://github.com/apache/activemq.git
test for replayed committed transaction on lost commit reply
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@891622 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b9e51d6492
commit
1606c59b9e
|
@ -19,6 +19,7 @@ package org.apache.activemq.transport.failover;
|
|||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.MessageConsumer;
|
||||
|
@ -28,7 +29,13 @@ import javax.jms.Session;
|
|||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerPlugin;
|
||||
import org.apache.activemq.broker.BrokerPluginSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -36,6 +43,7 @@ import org.junit.Test;
|
|||
// see https://issues.apache.org/activemq/browse/AMQ-2473
|
||||
public class FailoverTransactionTest {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(FailoverTransactionTest.class);
|
||||
private static final String QUEUE_NAME = "test.FailoverTransactionTest";
|
||||
private String url = "tcp://localhost:61616";
|
||||
BrokerService broker;
|
||||
|
@ -53,11 +61,16 @@ public class FailoverTransactionTest {
|
|||
}
|
||||
|
||||
public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
|
||||
broker = createBroker(deleteAllMessagesOnStartup);
|
||||
broker.start();
|
||||
}
|
||||
|
||||
public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
|
||||
broker = new BrokerService();
|
||||
broker.setUseJmx(false);
|
||||
broker.addConnector(url);
|
||||
broker.setDeleteAllMessagesOnStartup(true);
|
||||
broker.start();
|
||||
return broker;
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -88,6 +101,69 @@ public class FailoverTransactionTest {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverCommitReplyLost() throws Exception {
|
||||
|
||||
broker.stop();
|
||||
|
||||
broker = createBroker(true);
|
||||
broker.setPlugins(new BrokerPlugin[] {
|
||||
new BrokerPluginSupport() {
|
||||
@Override
|
||||
public void commitTransaction(ConnectionContext context,
|
||||
TransactionId xid, boolean onePhase) throws Exception {
|
||||
super.commitTransaction(context, xid, onePhase);
|
||||
// so commit will hang as if reply is lost
|
||||
context.setDontSendReponse(true);
|
||||
Executors.newSingleThreadExecutor().execute(new Runnable() {
|
||||
public void run() {
|
||||
LOG.info("Stopping broker post commit...");
|
||||
try {
|
||||
broker.stop();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
broker.start();
|
||||
|
||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
|
||||
Connection connection = cf.createConnection();
|
||||
connection.start();
|
||||
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue destination = session.createQueue(QUEUE_NAME);
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
|
||||
TextMessage message = session.createTextMessage("Test message");
|
||||
producer.send(message);
|
||||
|
||||
// broker will die on commit reply so this will hang till restart
|
||||
Executors.newSingleThreadExecutor().execute(new Runnable() {
|
||||
public void run() {
|
||||
LOG.info("doing async commit...");
|
||||
try {
|
||||
session.commit();
|
||||
LOG.info("done async commit");
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
broker.waitUntilStopped();
|
||||
startBroker(false);
|
||||
|
||||
assertNotNull("we got the message", consumer.receive(20000));
|
||||
assertNull("we got just one message", consumer.receive(2000));
|
||||
session.commit();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
|
||||
|
||||
|
|
Loading…
Reference in New Issue