This closes #223

This commit is contained in:
Clebert Suconic 2015-10-29 11:31:24 -04:00
commit 8aaed7568b
2 changed files with 8 additions and 3 deletions

View File

@ -642,7 +642,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge);
ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor());
ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor());
synchronized (sessions) {
if (closed || !clientProtocolManager.isAlive()) {

View File

@ -142,6 +142,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
private final ConfirmationWindowWarning confirmationWindowWarning;
private final Executor closeExecutor;
ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
final String name,
final String username,
@ -167,7 +169,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
final String groupID,
final SessionContext sessionContext,
final Executor executor,
final Executor flowControlExecutor) throws ActiveMQException {
final Executor flowControlExecutor,
final Executor closeExecutor) throws ActiveMQException {
this.sessionFactory = sessionFactory;
this.name = name;
@ -223,6 +226,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
sessionContext.setSession(this);
confirmationWindowWarning = sessionFactory.getConfirmationWindowWarning();
this.closeExecutor = closeExecutor;
}
// ClientSession implementation
@ -768,7 +773,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
final ClientConsumerInternal consumer = getConsumer(context);
if (consumer != null) {
executor.execute(new Runnable() {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {