diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java index eb54a12d2a..f2c6502106 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java @@ -26,6 +26,14 @@ import java.io.IOException; * */ public class SuppressReplyException extends RuntimeException { + public SuppressReplyException(Throwable cause) { + super(cause); + } + + public SuppressReplyException(String reason) { + super(reason); + } + public SuppressReplyException(String reason, IOException cause) { super(reason, cause); } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index e46737931e..00260d9f79 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -860,7 +860,7 @@ class DBManager(val parent:LevelDBStore) { def getMessage(x: MessageId):Message = { val id = Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x) val locator = id.getDataLocator() - val msg = client.getMessage(locator) + val msg = client.getMessageWithRetry(locator) msg.setMessageId(id) msg } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index 15f7bb07e6..c0cedce89d 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -52,6 +52,7 @@ import org.apache.activemq.leveldb.MessageRecord import org.apache.activemq.leveldb.EntryLocator import org.apache.activemq.leveldb.DataLocator import org.fusesource.hawtbuf.ByteArrayOutputStream +import org.apache.activemq.broker.SuppressReplyException /** * @author Hiram Chirino @@ -545,7 +546,7 @@ class LevelDBClient(store: LevelDBStore) { Thread.sleep(100); } } - throw failure; + throw new SuppressReplyException(failure); } try { func @@ -1244,7 +1245,7 @@ class LevelDBClient(store: LevelDBStore) { collectionCursor(collectionKey, encodeLong(seq)) { (key, value) => val seq = decodeLong(key) var locator = DataLocator(store, value.getValueLocation, value.getValueLength) - val msg = getMessage(locator) + val msg = getMessageWithRetry(locator) msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq)) msg.getMessageId().setDataLocator(locator) msg.setRedeliveryCounter(decodeQueueEntryMeta(value)) @@ -1270,7 +1271,7 @@ class LevelDBClient(store: LevelDBStore) { func(XaAckRecord(collectionKey, seq, ack, sub)) } else { var locator = DataLocator(store, value.getValueLocation, value.getValueLength) - val msg = getMessage(locator) + val msg = getMessageWithRetry(locator) msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq)) msg.getMessageId().setDataLocator(locator) func(msg) @@ -1287,6 +1288,22 @@ class LevelDBClient(store: LevelDBStore) { } } + def getMessageWithRetry(locator:AnyRef):Message = { + var retry = 0 + var rc = getMessage(locator); + while( rc == null ) { + if( retry > 10 ) + return null; + Thread.sleep(retry*10) + rc = getMessage(locator); + retry+=1 + } + if( retry > 0 ) { + info("Recovered from 'failed getMessage' on retry: "+retry) + } + rc + } + def getMessage(locator:AnyRef):Message = { assert(locator!=null) val buffer = locator match { diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 322656f72b..e4c7a0271a 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -17,7 +17,7 @@ package org.apache.activemq.leveldb -import org.apache.activemq.broker.{LockableServiceSupport, BrokerServiceAware, ConnectionContext} +import org.apache.activemq.broker.{SuppressReplyException, LockableServiceSupport, BrokerServiceAware, ConnectionContext} import org.apache.activemq.command._ import org.apache.activemq.openwire.OpenWireFormat import org.apache.activemq.usage.SystemUsage @@ -186,7 +186,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def check_running = { if( this.isStopped ) { - throw new IOException("Store has been stopped") + throw new SuppressReplyException("Store has been stopped") } } @@ -437,7 +437,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def verify_running = { if( isStopping || isStopped ) { try { - throw new IOException("Not running") + throw new SuppressReplyException("Not running") } catch { case e:IOException => if( broker_service!=null ) {