From 7c50c1c7363f1b582075e7a54b0254bc1eff0b7f Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 9 Aug 2013 15:11:11 +0000 Subject: [PATCH] fix up failure - still leveldb variant problem that needs work - testQueueTransactionalOrderWithRestart - org.apache.activemq.bugs.AMQ2149LevelDBTest git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1512332 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/leveldb/LevelDBStore.scala | 4 +- .../org/apache/activemq/bugs/AMQ2149Test.java | 102 ++++++++++++++---- 2 files changed, 86 insertions(+), 20 deletions(-) diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 7f6e193439..24cf033c52 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -450,7 +450,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def rollback(txid: TransactionId) = { transactions.remove(txid) match { case null => - println("The transaction does not exist") + debug("on rollback, the transaction " + txid + " does not exist") case tx => if( tx.prepared ) { val done = new CountDownLatch(1) @@ -470,7 +470,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def prepare(tx: TransactionId) = { transactions.get(tx) match { case null => - println("The transaction does not exist") + warn("on prepare, the transaction " + tx + " does not exist") case tx => tx.prepare } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java index b839131135..a1b1058a15 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java @@ -18,20 +18,13 @@ package org.apache.activemq.bugs; import java.io.File; +import java.lang.IllegalStateException; +import java.util.HashSet; import java.util.Timer; import java.util.TimerTask; import java.util.Vector; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TransactionRolledBackException; +import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.AutoFailTestSupport; @@ -127,6 +120,7 @@ public class AMQ2149Test extends AutoFailTestSupport { return stringBuilder.toString(); } + HashSet connections = new HashSet(); private class Receiver implements MessageListener { private final javax.jms.Destination dest; @@ -157,6 +151,7 @@ public class AMQ2149Test extends AutoFailTestSupport { } messageConsumer.setMessageListener(this); connection.start(); + connections.add(connection); } public void close() throws JMSException { @@ -208,6 +203,8 @@ public class AMQ2149Test extends AutoFailTestSupport { // in doubt - either commit command or reply missing // don't know if we will get a replay resumeOnNextOrPreviousIsOk = true; + nextExpectedSeqNum++; + LOG.info("in doubt transaction completion: ok to get next or previous batch. next:" + nextExpectedSeqNum); } else { resumeOnNextOrPreviousIsOk = false; // batch will be replayed @@ -242,6 +239,7 @@ public class AMQ2149Test extends AutoFailTestSupport { messageProducer = session.createProducer(dest); messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); + connections.add(connection); } public void run() { @@ -258,7 +256,11 @@ public class AMQ2149Test extends AutoFailTestSupport { if ((nextSequenceNumber % 500) == 0) { LOG.info(dest + " sent " + nextSequenceNumber); } - + + } catch (javax.jms.IllegalStateException e) { + LOG.error(dest + " bailing on send error", e); + exceptions.add(e); + break; } catch (Exception e) { LOG.error(dest + " send error", e); exceptions.add(e); @@ -278,6 +280,52 @@ public class AMQ2149Test extends AutoFailTestSupport { } } + // attempt to simply replicate leveldb failure. no joy yet + public void x_testRestartReReceive() throws Exception { + createBroker(new Configurer() { + public void configure(BrokerService broker) throws Exception { + broker.deleteAllMessages(); + } + }); + + final javax.jms.Destination destination = + ActiveMQDestination.createDestination("test.dest.X", ActiveMQDestination.QUEUE_TYPE); + Thread thread = new Thread(new Sender(destination)); + thread.start(); + thread.join(); + + Connection connection = new ActiveMQConnectionFactory(brokerURL).createConnection(); + connection.setClientID(destination.toString()); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer messageConsumer = session.createConsumer(destination); + connection.start(); + + int batch = 200; + long expectedSeq; + + final TimerTask restartTask = schedualRestartTask(null, new Configurer() { + public void configure(BrokerService broker) throws Exception { + } + }); + + expectedSeq = 0; + for (int s = 0; s < 4; s++) { + for (int i = 0; i < batch; i++) { + Message message = messageConsumer.receive(20000); + assertNotNull("s:" + s + ", i:" + i, message); + final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY); + assertEquals("expected order s:" + s, expectedSeq++, seqNum); + + if (i > 0 && i%600 == 0) { + LOG.info("Commit on %5"); + // session.commit(); + } + } + restartTask.run(); + } + + } + // no need to run this unless there are some issues with the others public void vanilaVerify_testOrder() throws Exception { @@ -381,7 +429,7 @@ public class AMQ2149Test extends AutoFailTestSupport { } } - private void schedualRestartTask(final Timer timer, final Configurer configurer) { + private TimerTask schedualRestartTask(final Timer timer, final Configurer configurer) { class RestartTask extends TimerTask { public void run() { synchronized (brokerLock) { @@ -402,18 +450,22 @@ public class AMQ2149Test extends AutoFailTestSupport { exceptions.add(e); } } - if (++numBrokerRestarts < MAX_BROKER_RESTARTS) { + if (++numBrokerRestarts < MAX_BROKER_RESTARTS && timer != null) { // do it again try { timer.schedule(new RestartTask(), brokerStopPeriod); - } catch (IllegalStateException ignore_alreadyCancelled) { + } catch (IllegalStateException ignore_alreadyCancelled) { } } else { LOG.info("no longer stopping broker on reaching Max restarts: " + MAX_BROKER_RESTARTS); } - } + } } - timer.schedule(new RestartTask(), brokerStopPeriod); + RestartTask task = new RestartTask(); + if (timer != null) { + timer.schedule(task, brokerStopPeriod); + } + return task; } private void verifyOrderedMessageReceipt(byte destinationType) throws Exception { @@ -446,8 +498,8 @@ public class AMQ2149Test extends AutoFailTestSupport { threads.remove(sendThread); } } - LOG.info("senders done..."); - + LOG.info("senders done..." + threads); + while(!receivers.isEmpty() && System.currentTimeMillis() < expiry) { Receiver receiver = receivers.firstElement(); if (receiver.getNextExpectedSeqNo() >= numtoSend || !exceptions.isEmpty()) { @@ -459,6 +511,20 @@ public class AMQ2149Test extends AutoFailTestSupport { if (!exceptions.isEmpty()) { exceptions.get(0).printStackTrace(); } + + for (Connection connection : connections) { + try { + connection.close(); + } catch (Exception ignored) {} + } + connections.clear(); + + LOG.info("Dangling threads: " + threads); + for (Thread dangling : threads) { + dangling.interrupt(); + dangling.join(10*1000); + } + assertTrue("No exceptions", exceptions.isEmpty()); }