mirror of https://github.com/apache/activemq.git
Have the leveldb store thorw SuppressReplyExceptions instead of IOExceptions so that the clients retry try the operations instead of giving up. Also retry the problemantic getMessage() call which seems to fail at times.
This commit is contained in:
parent
00cb9a5668
commit
b0e91d47f5
|
@ -26,6 +26,14 @@ import java.io.IOException;
|
|||
*
|
||||
*/
|
||||
public class SuppressReplyException extends RuntimeException {
|
||||
public SuppressReplyException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public SuppressReplyException(String reason) {
|
||||
super(reason);
|
||||
}
|
||||
|
||||
public SuppressReplyException(String reason, IOException cause) {
|
||||
super(reason, cause);
|
||||
}
|
||||
|
|
|
@ -860,7 +860,7 @@ class DBManager(val parent:LevelDBStore) {
|
|||
def getMessage(x: MessageId):Message = {
|
||||
val id = Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x)
|
||||
val locator = id.getDataLocator()
|
||||
val msg = client.getMessage(locator)
|
||||
val msg = client.getMessageWithRetry(locator)
|
||||
msg.setMessageId(id)
|
||||
msg
|
||||
}
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.activemq.leveldb.MessageRecord
|
|||
import org.apache.activemq.leveldb.EntryLocator
|
||||
import org.apache.activemq.leveldb.DataLocator
|
||||
import org.fusesource.hawtbuf.ByteArrayOutputStream
|
||||
import org.apache.activemq.broker.SuppressReplyException
|
||||
|
||||
/**
|
||||
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||
|
@ -545,7 +546,7 @@ class LevelDBClient(store: LevelDBStore) {
|
|||
Thread.sleep(100);
|
||||
}
|
||||
}
|
||||
throw failure;
|
||||
throw new SuppressReplyException(failure);
|
||||
}
|
||||
try {
|
||||
func
|
||||
|
@ -1244,7 +1245,7 @@ class LevelDBClient(store: LevelDBStore) {
|
|||
collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
|
||||
val seq = decodeLong(key)
|
||||
var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
|
||||
val msg = getMessage(locator)
|
||||
val msg = getMessageWithRetry(locator)
|
||||
msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
|
||||
msg.getMessageId().setDataLocator(locator)
|
||||
msg.setRedeliveryCounter(decodeQueueEntryMeta(value))
|
||||
|
@ -1270,7 +1271,7 @@ class LevelDBClient(store: LevelDBStore) {
|
|||
func(XaAckRecord(collectionKey, seq, ack, sub))
|
||||
} else {
|
||||
var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
|
||||
val msg = getMessage(locator)
|
||||
val msg = getMessageWithRetry(locator)
|
||||
msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
|
||||
msg.getMessageId().setDataLocator(locator)
|
||||
func(msg)
|
||||
|
@ -1287,6 +1288,22 @@ class LevelDBClient(store: LevelDBStore) {
|
|||
}
|
||||
}
|
||||
|
||||
def getMessageWithRetry(locator:AnyRef):Message = {
|
||||
var retry = 0
|
||||
var rc = getMessage(locator);
|
||||
while( rc == null ) {
|
||||
if( retry > 10 )
|
||||
return null;
|
||||
Thread.sleep(retry*10)
|
||||
rc = getMessage(locator);
|
||||
retry+=1
|
||||
}
|
||||
if( retry > 0 ) {
|
||||
info("Recovered from 'failed getMessage' on retry: "+retry)
|
||||
}
|
||||
rc
|
||||
}
|
||||
|
||||
def getMessage(locator:AnyRef):Message = {
|
||||
assert(locator!=null)
|
||||
val buffer = locator match {
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.activemq.leveldb
|
||||
|
||||
import org.apache.activemq.broker.{LockableServiceSupport, BrokerServiceAware, ConnectionContext}
|
||||
import org.apache.activemq.broker.{SuppressReplyException, LockableServiceSupport, BrokerServiceAware, ConnectionContext}
|
||||
import org.apache.activemq.command._
|
||||
import org.apache.activemq.openwire.OpenWireFormat
|
||||
import org.apache.activemq.usage.SystemUsage
|
||||
|
@ -186,7 +186,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
|||
|
||||
def check_running = {
|
||||
if( this.isStopped ) {
|
||||
throw new IOException("Store has been stopped")
|
||||
throw new SuppressReplyException("Store has been stopped")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -437,7 +437,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
|||
def verify_running = {
|
||||
if( isStopping || isStopped ) {
|
||||
try {
|
||||
throw new IOException("Not running")
|
||||
throw new SuppressReplyException("Not running")
|
||||
} catch {
|
||||
case e:IOException =>
|
||||
if( broker_service!=null ) {
|
||||
|
|
Loading…
Reference in New Issue