fix up failure - still leveldb variant problem that needs work - testQueueTransactionalOrderWithRestart - org.apache.activemq.bugs.AMQ2149LevelDBTest

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1512332 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-08-09 15:11:11 +00:00
parent 1773e2c11c
commit 7c50c1c736
2 changed files with 86 additions and 20 deletions

View File

@ -450,7 +450,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def rollback(txid: TransactionId) = {
transactions.remove(txid) match {
case null =>
println("The transaction does not exist")
debug("on rollback, the transaction " + txid + " does not exist")
case tx =>
if( tx.prepared ) {
val done = new CountDownLatch(1)
@ -470,7 +470,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def prepare(tx: TransactionId) = {
transactions.get(tx) match {
case null =>
println("The transaction does not exist")
warn("on prepare, the transaction " + tx + " does not exist")
case tx =>
tx.prepare
}

View File

@ -18,20 +18,13 @@
package org.apache.activemq.bugs;
import java.io.File;
import java.lang.IllegalStateException;
import java.util.HashSet;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Vector;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TransactionRolledBackException;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
@ -127,6 +120,7 @@ public class AMQ2149Test extends AutoFailTestSupport {
return stringBuilder.toString();
}
HashSet<Connection> connections = new HashSet<Connection>();
private class Receiver implements MessageListener {
private final javax.jms.Destination dest;
@ -157,6 +151,7 @@ public class AMQ2149Test extends AutoFailTestSupport {
}
messageConsumer.setMessageListener(this);
connection.start();
connections.add(connection);
}
public void close() throws JMSException {
@ -208,6 +203,8 @@ public class AMQ2149Test extends AutoFailTestSupport {
// in doubt - either commit command or reply missing
// don't know if we will get a replay
resumeOnNextOrPreviousIsOk = true;
nextExpectedSeqNum++;
LOG.info("in doubt transaction completion: ok to get next or previous batch. next:" + nextExpectedSeqNum);
} else {
resumeOnNextOrPreviousIsOk = false;
// batch will be replayed
@ -242,6 +239,7 @@ public class AMQ2149Test extends AutoFailTestSupport {
messageProducer = session.createProducer(dest);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
connections.add(connection);
}
public void run() {
@ -258,7 +256,11 @@ public class AMQ2149Test extends AutoFailTestSupport {
if ((nextSequenceNumber % 500) == 0) {
LOG.info(dest + " sent " + nextSequenceNumber);
}
} catch (javax.jms.IllegalStateException e) {
LOG.error(dest + " bailing on send error", e);
exceptions.add(e);
break;
} catch (Exception e) {
LOG.error(dest + " send error", e);
exceptions.add(e);
@ -278,6 +280,52 @@ public class AMQ2149Test extends AutoFailTestSupport {
}
}
// attempt to simply replicate leveldb failure. no joy yet
public void x_testRestartReReceive() throws Exception {
createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception {
broker.deleteAllMessages();
}
});
final javax.jms.Destination destination =
ActiveMQDestination.createDestination("test.dest.X", ActiveMQDestination.QUEUE_TYPE);
Thread thread = new Thread(new Sender(destination));
thread.start();
thread.join();
Connection connection = new ActiveMQConnectionFactory(brokerURL).createConnection();
connection.setClientID(destination.toString());
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = session.createConsumer(destination);
connection.start();
int batch = 200;
long expectedSeq;
final TimerTask restartTask = schedualRestartTask(null, new Configurer() {
public void configure(BrokerService broker) throws Exception {
}
});
expectedSeq = 0;
for (int s = 0; s < 4; s++) {
for (int i = 0; i < batch; i++) {
Message message = messageConsumer.receive(20000);
assertNotNull("s:" + s + ", i:" + i, message);
final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
assertEquals("expected order s:" + s, expectedSeq++, seqNum);
if (i > 0 && i%600 == 0) {
LOG.info("Commit on %5");
// session.commit();
}
}
restartTask.run();
}
}
// no need to run this unless there are some issues with the others
public void vanilaVerify_testOrder() throws Exception {
@ -381,7 +429,7 @@ public class AMQ2149Test extends AutoFailTestSupport {
}
}
private void schedualRestartTask(final Timer timer, final Configurer configurer) {
private TimerTask schedualRestartTask(final Timer timer, final Configurer configurer) {
class RestartTask extends TimerTask {
public void run() {
synchronized (brokerLock) {
@ -402,18 +450,22 @@ public class AMQ2149Test extends AutoFailTestSupport {
exceptions.add(e);
}
}
if (++numBrokerRestarts < MAX_BROKER_RESTARTS) {
if (++numBrokerRestarts < MAX_BROKER_RESTARTS && timer != null) {
// do it again
try {
timer.schedule(new RestartTask(), brokerStopPeriod);
} catch (IllegalStateException ignore_alreadyCancelled) {
} catch (IllegalStateException ignore_alreadyCancelled) {
}
} else {
LOG.info("no longer stopping broker on reaching Max restarts: " + MAX_BROKER_RESTARTS);
}
}
}
}
timer.schedule(new RestartTask(), brokerStopPeriod);
RestartTask task = new RestartTask();
if (timer != null) {
timer.schedule(task, brokerStopPeriod);
}
return task;
}
private void verifyOrderedMessageReceipt(byte destinationType) throws Exception {
@ -446,8 +498,8 @@ public class AMQ2149Test extends AutoFailTestSupport {
threads.remove(sendThread);
}
}
LOG.info("senders done...");
LOG.info("senders done..." + threads);
while(!receivers.isEmpty() && System.currentTimeMillis() < expiry) {
Receiver receiver = receivers.firstElement();
if (receiver.getNextExpectedSeqNo() >= numtoSend || !exceptions.isEmpty()) {
@ -459,6 +511,20 @@ public class AMQ2149Test extends AutoFailTestSupport {
if (!exceptions.isEmpty()) {
exceptions.get(0).printStackTrace();
}
for (Connection connection : connections) {
try {
connection.close();
} catch (Exception ignored) {}
}
connections.clear();
LOG.info("Dangling threads: " + threads);
for (Thread dangling : threads) {
dangling.interrupt();
dangling.join(10*1000);
}
assertTrue("No exceptions", exceptions.isEmpty());
}