diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 3902fd3ca7..07a51e4c99 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -642,7 +642,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge); - ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor()); + ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor()); synchronized (sessions) { if (closed || !clientProtocolManager.isAlive()) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index acb07256a1..93618af4ec 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -142,6 +142,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi private final ConfirmationWindowWarning confirmationWindowWarning; + private final Executor closeExecutor; + ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory, final String name, final String username, @@ -167,7 +169,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi final String groupID, final SessionContext sessionContext, final Executor executor, - final Executor flowControlExecutor) throws ActiveMQException { + final Executor flowControlExecutor, + final Executor closeExecutor) throws ActiveMQException { this.sessionFactory = sessionFactory; this.name = name; @@ -223,6 +226,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi sessionContext.setSession(this); confirmationWindowWarning = sessionFactory.getConfirmationWindowWarning(); + + this.closeExecutor = closeExecutor; } // ClientSession implementation @@ -768,7 +773,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi final ClientConsumerInternal consumer = getConsumer(context); if (consumer != null) { - executor.execute(new Runnable() { + closeExecutor.execute(new Runnable() { @Override public void run() { try {