From 26898e4663ce00468ec9cebf00c57d72658dd191 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Thu, 29 Oct 2015 12:50:08 +0000 Subject: [PATCH] ARTEMIS-288 Start close method in separate executor When server sends disconnect to the client, the ClientSession schedules a close task on it's ordered executor. Once the close method starts it's waits to check to see if all jobs in it's executor has completed. To do this it adds a job to it's ordered executor, once it is run it knows there is nothing more to do and thus is ready to close. However, this causes a deadlock as both jobs are running in the ordered executor and thus are both waiting on each other. The close eventually timesout which is why we see the logs as reported in the JIRA. This commit runs the close method in it's own ordered executor, thus preventing the two jobs blocking each other. --- .../core/client/impl/ClientSessionFactoryImpl.java | 2 +- .../artemis/core/client/impl/ClientSessionImpl.java | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 3902fd3ca7..07a51e4c99 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -642,7 +642,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge); - ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor()); + ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor()); synchronized (sessions) { if (closed || !clientProtocolManager.isAlive()) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index acb07256a1..93618af4ec 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -142,6 +142,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi private final ConfirmationWindowWarning confirmationWindowWarning; + private final Executor closeExecutor; + ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory, final String name, final String username, @@ -167,7 +169,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi final String groupID, final SessionContext sessionContext, final Executor executor, - final Executor flowControlExecutor) throws ActiveMQException { + final Executor flowControlExecutor, + final Executor closeExecutor) throws ActiveMQException { this.sessionFactory = sessionFactory; this.name = name; @@ -223,6 +226,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi sessionContext.setSession(this); confirmationWindowWarning = sessionFactory.getConfirmationWindowWarning(); + + this.closeExecutor = closeExecutor; } // ClientSession implementation @@ -768,7 +773,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi final ClientConsumerInternal consumer = getConsumer(context); if (consumer != null) { - executor.execute(new Runnable() { + closeExecutor.execute(new Runnable() { @Override public void run() { try {