This closes 210 potential deadlock fix
This commit is contained in:
commit
fe849a4f1e
|
@ -44,6 +44,11 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
|
|||
this.amqpConnection = amqpConnection;
|
||||
}
|
||||
|
||||
public Executor getExecutor()
|
||||
{
|
||||
return this.executor;
|
||||
}
|
||||
|
||||
public ProtonProtocolManager getManager()
|
||||
{
|
||||
return manager;
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.core.protocol.proton.plug;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
@ -63,6 +64,18 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
|
|||
|
||||
}
|
||||
|
||||
public Executor getExeuctor()
|
||||
{
|
||||
if (protonConnectionDelegate != null)
|
||||
{
|
||||
return protonConnectionDelegate.getExecutor();
|
||||
}
|
||||
else
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConnection(AMQPConnectionContext connection)
|
||||
{
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
package org.apache.activemq.core.protocol.proton.plug;
|
||||
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.transport.AmqpError;
|
||||
|
@ -172,9 +174,37 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
|
|||
}
|
||||
|
||||
@Override
|
||||
public void closeSender(Object brokerConsumer) throws Exception
|
||||
public void closeSender(final Object brokerConsumer) throws Exception
|
||||
{
|
||||
((ServerConsumer) brokerConsumer).close(false);
|
||||
Runnable runnable = new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try
|
||||
{
|
||||
((ServerConsumer) brokerConsumer).close(false);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
// Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol)
|
||||
// to avoid deadlocks the close has to be done outside of the main thread on an executor
|
||||
// otherwise you could get a deadlock
|
||||
Executor executor = protonSPI.getExeuctor();
|
||||
|
||||
if (executor != null)
|
||||
{
|
||||
executor.execute(runnable);
|
||||
}
|
||||
else
|
||||
{
|
||||
runnable.run();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue