git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@673433 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-07-02 16:22:59 +00:00
parent 3f4d4a5f7a
commit 4118d02a95
4 changed files with 23 additions and 7 deletions

View File

@ -941,7 +941,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
try { try {
transport.stop(); transport.stop();
LOG.debug("Stopped connection: " + transport.getRemoteAddress()); LOG.debug("Stopped transport: " + transport.getRemoteAddress());
} catch (Exception e) { } catch (Exception e) {
LOG.debug("Could not stop transport: " + e, e); LOG.debug("Could not stop transport: " + e, e);
} }

View File

@ -383,6 +383,8 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
ConnectionState cs = connectionStates.get(connectionId); ConnectionState cs = connectionStates.get(connectionId);
if (cs != null) { if (cs != null) {
cs.addTransactionState(info.getTransactionId()); cs.addTransactionState(info.getTransactionId());
TransactionState state = cs.getTransactionState(info.getTransactionId());
state.addCommand(info);
} }
} }
return TRACKED_RESPONSE_MARKER; return TRACKED_RESPONSE_MARKER;

View File

@ -131,7 +131,7 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
for (int i = 0; i < batchSize; i++) { for (int i = 0; i < batchSize; i++) {
producer.send(message); producer.send(message);
} }
messageSent();
session.commit(); session.commit();
LOG.info("Consuming bacth " + j + " of " + batchSize + " messages"); LOG.info("Consuming bacth " + j + " of " + batchSize + " messages");
@ -145,6 +145,9 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
} }
} }
protected void messageSent() throws Exception {
}
/** /**
* Sends a batch of messages and validates that the rollbacked message was * Sends a batch of messages and validates that the rollbacked message was
* not consumed. * not consumed.

View File

@ -30,6 +30,7 @@ public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest {
protected int inflightMessageCount; protected int inflightMessageCount;
protected int failureCount = 50; protected int failureCount = 50;
protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false"; protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false";
private boolean stopMaster = false;
protected void setUp() throws Exception { protected void setUp() throws Exception {
failureCount = super.batchCount / 2; failureCount = super.batchCount / 2;
@ -76,12 +77,22 @@ public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest {
return new ActiveMQConnectionFactory(uriString); return new ActiveMQConnectionFactory(uriString);
} }
protected void messageSent() throws Exception { public void testSendReceiveTransactedBatchesWithMasterStop() throws Exception {
try {
stopMaster = true;
testSendReceiveTransactedBatches();
} finally {
stopMaster = false;
}
}
protected void messageSent() throws Exception {
if (stopMaster) {
if (++inflightMessageCount >= failureCount) { if (++inflightMessageCount >= failureCount) {
inflightMessageCount = 0; inflightMessageCount = 0;
Thread.sleep(1000); Thread.sleep(1000);
broker.stop(); broker.stop();
} }
} }
}
} }