merging patch to fix casting issues

Thanks for Marc Breslow for the patch

This closes #166
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-02-01 12:58:17 +00:00
commit 9361bc6a0f
3 changed files with 7 additions and 23 deletions

View File

@ -108,12 +108,8 @@ public class MemoryMessageStore extends AbstractMessageStore {
// here
synchronized (messageTable) {
for (Iterator<Message> iter = messageTable.values().iterator(); iter.hasNext();) {
Object msg = iter.next();
if (msg.getClass() == MessageId.class) {
listener.recoverMessageReference((MessageId)msg);
} else {
listener.recoverMessage((Message)msg);
}
Message msg = iter.next();
listener.recoverMessage(msg);
}
}
}

View File

@ -72,12 +72,8 @@ class MemoryTopicSub {
synchronized void recoverSubscription(MessageRecoveryListener listener) throws Exception {
for (Iterator<Entry<MessageId, Message>> iter = map.entrySet().iterator(); iter.hasNext();) {
Entry<MessageId, Message> entry = iter.next();
Object msg = entry.getValue();
if (msg.getClass() == MessageId.class) {
listener.recoverMessageReference((MessageId)msg);
} else {
listener.recoverMessage((Message)msg);
}
Message msg = entry.getValue();
listener.recoverMessage(msg);
}
}
@ -91,13 +87,9 @@ class MemoryTopicSub {
Entry<MessageId, Message> entry = iter.next();
if (pastLackBatch) {
count++;
Object msg = entry.getValue();
lastId = (MessageId)entry.getKey();
if (msg.getClass() == MessageId.class) {
listener.recoverMessageReference((MessageId)msg);
} else {
listener.recoverMessage((Message)msg);
}
Message msg = entry.getValue();
lastId = entry.getKey();
listener.recoverMessage(msg);
} else {
pastLackBatch = entry.getKey().equals(lastBatch);
}

View File

@ -265,10 +265,6 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
ActiveMQSession session = null;
if (s instanceof ActiveMQSession) {
session = (ActiveMQSession) s;
} else if (s instanceof ActiveMQQueueSession) {
session = (ActiveMQSession) s;
} else if (s instanceof ActiveMQTopicSession) {
session = (ActiveMQSession) s;
} else {
activeMQAsfEndpointWorker.getConnection()
.onAsyncException(new JMSException(