code cleanup

(cherry picked from commit 161730fda2)
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-02-01 13:04:07 +00:00
parent 0b481903aa
commit f514b15713
1 changed files with 6 additions and 7 deletions

View File

@ -31,9 +31,7 @@ import javax.resource.spi.UnavailableException;
import javax.resource.spi.endpoint.MessageEndpoint; import javax.resource.spi.endpoint.MessageEndpoint;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQQueueSession;
import org.apache.activemq.ActiveMQSession; import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.ActiveMQTopicSession;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -76,11 +74,11 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
if (activationSpec.isUseRAManagedTransactionEnabled()) { if (activationSpec.isUseRAManagedTransactionEnabled()) {
// The RA will manage the transaction commit. // The RA will manage the transaction commit.
endpoint = createEndpoint(null); endpoint = createEndpoint(null);
return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, true, batchSize); return new ServerSessionImpl(this, session, activeMQAsfEndpointWorker.workManager, endpoint, true, batchSize);
} else { } else {
// Give the container an object to manage to transaction with. // Give the container an object to manage to transaction with.
endpoint = createEndpoint(new LocalAndXATransaction(session.getTransactionContext())); endpoint = createEndpoint(new LocalAndXATransaction(session.getTransactionContext()));
return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, false, batchSize); return new ServerSessionImpl(this, session, activeMQAsfEndpointWorker.workManager, endpoint, false, batchSize);
} }
} catch (UnavailableException e) { } catch (UnavailableException e) {
// The container could be limiting us on the number of endpoints // The container could be limiting us on the number of endpoints
@ -102,6 +100,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
/** /**
*/ */
@Override
public ServerSession getServerSession() throws JMSException { public ServerSession getServerSession() throws JMSException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("ServerSession requested."); LOG.debug("ServerSession requested.");
@ -226,12 +225,12 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
} }
try { try {
ActiveMQSession session = (ActiveMQSession)ss.getSession(); ActiveMQSession session = (ActiveMQSession)ss.getSession();
List l = session.getUnconsumedMessages(); List<MessageDispatch> l = session.getUnconsumedMessages();
if (!l.isEmpty()) { if (!l.isEmpty()) {
ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection(); ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection();
if (connection != null) { if (connection != null) {
for (Iterator i = l.iterator(); i.hasNext();) { for (Iterator<MessageDispatch> i = l.iterator(); i.hasNext();) {
MessageDispatch md = (MessageDispatch)i.next(); MessageDispatch md = i.next();
if (connection.hasDispatcher(md.getConsumerId())) { if (connection.hasDispatcher(md.getConsumerId())) {
dispatchToSession(md); dispatchToSession(md);
LOG.trace("on remove of {} redispatch of {}", session, md); LOG.trace("on remove of {} redispatch of {}", session, md);