ARTEMIS-288 Start close method in separate executor
When server sends disconnect to the client, the ClientSession schedules a close task on it's ordered executor. Once the close method starts it's waits to check to see if all jobs in it's executor has completed. To do this it adds a job to it's ordered executor, once it is run it knows there is nothing more to do and thus is ready to close. However, this causes a deadlock as both jobs are running in the ordered executor and thus are both waiting on each other. The close eventually timesout which is why we see the logs as reported in the JIRA. This commit runs the close method in it's own ordered executor, thus preventing the two jobs blocking each other.
This commit is contained in:
parent
2cc2e29b11
commit
26898e4663
|
@ -642,7 +642,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
||||||
|
|
||||||
SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge);
|
SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge);
|
||||||
|
|
||||||
ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor());
|
ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor());
|
||||||
|
|
||||||
synchronized (sessions) {
|
synchronized (sessions) {
|
||||||
if (closed || !clientProtocolManager.isAlive()) {
|
if (closed || !clientProtocolManager.isAlive()) {
|
||||||
|
|
|
@ -142,6 +142,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
||||||
|
|
||||||
private final ConfirmationWindowWarning confirmationWindowWarning;
|
private final ConfirmationWindowWarning confirmationWindowWarning;
|
||||||
|
|
||||||
|
private final Executor closeExecutor;
|
||||||
|
|
||||||
ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
|
ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
|
||||||
final String name,
|
final String name,
|
||||||
final String username,
|
final String username,
|
||||||
|
@ -167,7 +169,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
||||||
final String groupID,
|
final String groupID,
|
||||||
final SessionContext sessionContext,
|
final SessionContext sessionContext,
|
||||||
final Executor executor,
|
final Executor executor,
|
||||||
final Executor flowControlExecutor) throws ActiveMQException {
|
final Executor flowControlExecutor,
|
||||||
|
final Executor closeExecutor) throws ActiveMQException {
|
||||||
this.sessionFactory = sessionFactory;
|
this.sessionFactory = sessionFactory;
|
||||||
|
|
||||||
this.name = name;
|
this.name = name;
|
||||||
|
@ -223,6 +226,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
||||||
sessionContext.setSession(this);
|
sessionContext.setSession(this);
|
||||||
|
|
||||||
confirmationWindowWarning = sessionFactory.getConfirmationWindowWarning();
|
confirmationWindowWarning = sessionFactory.getConfirmationWindowWarning();
|
||||||
|
|
||||||
|
this.closeExecutor = closeExecutor;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClientSession implementation
|
// ClientSession implementation
|
||||||
|
@ -768,7 +773,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
||||||
final ClientConsumerInternal consumer = getConsumer(context);
|
final ClientConsumerInternal consumer = getConsumer(context);
|
||||||
|
|
||||||
if (consumer != null) {
|
if (consumer != null) {
|
||||||
executor.execute(new Runnable() {
|
closeExecutor.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue