From b0e91d47f5fced59c89a34d993f4d87c7986b04b Mon Sep 17 00:00:00 2001 From: Hiram Chirino Date: Mon, 25 Nov 2013 13:17:58 -0500 Subject: [PATCH] Have the leveldb store thorw SuppressReplyExceptions instead of IOExceptions so that the clients retry try the operations instead of giving up. Also retry the problemantic getMessage() call which seems to fail at times. --- .../broker/SuppressReplyException.java | 8 +++++++ .../apache/activemq/leveldb/DBManager.scala | 2 +- .../activemq/leveldb/LevelDBClient.scala | 23 ++++++++++++++++--- .../activemq/leveldb/LevelDBStore.scala | 6 ++--- 4 files changed, 32 insertions(+), 7 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java index eb54a12d2a..f2c6502106 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java @@ -26,6 +26,14 @@ import java.io.IOException; * */ public class SuppressReplyException extends RuntimeException { + public SuppressReplyException(Throwable cause) { + super(cause); + } + + public SuppressReplyException(String reason) { + super(reason); + } + public SuppressReplyException(String reason, IOException cause) { super(reason, cause); } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index e46737931e..00260d9f79 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -860,7 +860,7 @@ class DBManager(val parent:LevelDBStore) { def getMessage(x: MessageId):Message = { val id = Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x) val locator = id.getDataLocator() - val msg = client.getMessage(locator) + val msg = client.getMessageWithRetry(locator) msg.setMessageId(id) msg } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index 15f7bb07e6..c0cedce89d 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -52,6 +52,7 @@ import org.apache.activemq.leveldb.MessageRecord import org.apache.activemq.leveldb.EntryLocator import org.apache.activemq.leveldb.DataLocator import org.fusesource.hawtbuf.ByteArrayOutputStream +import org.apache.activemq.broker.SuppressReplyException /** * @author Hiram Chirino @@ -545,7 +546,7 @@ class LevelDBClient(store: LevelDBStore) { Thread.sleep(100); } } - throw failure; + throw new SuppressReplyException(failure); } try { func @@ -1244,7 +1245,7 @@ class LevelDBClient(store: LevelDBStore) { collectionCursor(collectionKey, encodeLong(seq)) { (key, value) => val seq = decodeLong(key) var locator = DataLocator(store, value.getValueLocation, value.getValueLength) - val msg = getMessage(locator) + val msg = getMessageWithRetry(locator) msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq)) msg.getMessageId().setDataLocator(locator) msg.setRedeliveryCounter(decodeQueueEntryMeta(value)) @@ -1270,7 +1271,7 @@ class LevelDBClient(store: LevelDBStore) { func(XaAckRecord(collectionKey, seq, ack, sub)) } else { var locator = DataLocator(store, value.getValueLocation, value.getValueLength) - val msg = getMessage(locator) + val msg = getMessageWithRetry(locator) msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq)) msg.getMessageId().setDataLocator(locator) func(msg) @@ -1287,6 +1288,22 @@ class LevelDBClient(store: LevelDBStore) { } } + def getMessageWithRetry(locator:AnyRef):Message = { + var retry = 0 + var rc = getMessage(locator); + while( rc == null ) { + if( retry > 10 ) + return null; + Thread.sleep(retry*10) + rc = getMessage(locator); + retry+=1 + } + if( retry > 0 ) { + info("Recovered from 'failed getMessage' on retry: "+retry) + } + rc + } + def getMessage(locator:AnyRef):Message = { assert(locator!=null) val buffer = locator match { diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 322656f72b..e4c7a0271a 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -17,7 +17,7 @@ package org.apache.activemq.leveldb -import org.apache.activemq.broker.{LockableServiceSupport, BrokerServiceAware, ConnectionContext} +import org.apache.activemq.broker.{SuppressReplyException, LockableServiceSupport, BrokerServiceAware, ConnectionContext} import org.apache.activemq.command._ import org.apache.activemq.openwire.OpenWireFormat import org.apache.activemq.usage.SystemUsage @@ -186,7 +186,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def check_running = { if( this.isStopped ) { - throw new IOException("Store has been stopped") + throw new SuppressReplyException("Store has been stopped") } } @@ -437,7 +437,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def verify_running = { if( isStopping || isStopped ) { try { - throw new IOException("Not running") + throw new SuppressReplyException("Not running") } catch { case e:IOException => if( broker_service!=null ) {