diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java index 9db6df5508..94bca0c6dd 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java @@ -31,9 +31,7 @@ import javax.resource.spi.UnavailableException; import javax.resource.spi.endpoint.MessageEndpoint; import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQQueueSession; import org.apache.activemq.ActiveMQSession; -import org.apache.activemq.ActiveMQTopicSession; import org.apache.activemq.command.MessageDispatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,11 +74,11 @@ public class ServerSessionPoolImpl implements ServerSessionPool { if (activationSpec.isUseRAManagedTransactionEnabled()) { // The RA will manage the transaction commit. endpoint = createEndpoint(null); - return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, true, batchSize); + return new ServerSessionImpl(this, session, activeMQAsfEndpointWorker.workManager, endpoint, true, batchSize); } else { // Give the container an object to manage to transaction with. endpoint = createEndpoint(new LocalAndXATransaction(session.getTransactionContext())); - return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, false, batchSize); + return new ServerSessionImpl(this, session, activeMQAsfEndpointWorker.workManager, endpoint, false, batchSize); } } catch (UnavailableException e) { // The container could be limiting us on the number of endpoints @@ -102,6 +100,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool { /** */ + @Override public ServerSession getServerSession() throws JMSException { if (LOG.isDebugEnabled()) { LOG.debug("ServerSession requested."); @@ -226,12 +225,12 @@ public class ServerSessionPoolImpl implements ServerSessionPool { } try { ActiveMQSession session = (ActiveMQSession)ss.getSession(); - List l = session.getUnconsumedMessages(); + List l = session.getUnconsumedMessages(); if (!l.isEmpty()) { ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection(); if (connection != null) { - for (Iterator i = l.iterator(); i.hasNext();) { - MessageDispatch md = (MessageDispatch)i.next(); + for (Iterator i = l.iterator(); i.hasNext();) { + MessageDispatch md = i.next(); if (connection.hasDispatcher(md.getConsumerId())) { dispatchToSession(md); LOG.trace("on remove of {} redispatch of {}", session, md);