Avoiding possible deadlock after Proton deliveries

This commit is contained in:
Clebert Suconic 2015-04-22 16:44:03 -04:00
parent 47edcd4014
commit e62112fbff
3 changed files with 50 additions and 2 deletions

View File

@ -44,6 +44,11 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
this.amqpConnection = amqpConnection;
}
public Executor getExecutor()
{
return this.executor;
}
public ProtonProtocolManager getManager()
{
return manager;

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.core.protocol.proton.plug;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
@ -63,6 +64,18 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
}
public Executor getExeuctor()
{
if (protonConnectionDelegate != null)
{
return protonConnectionDelegate.getExecutor();
}
else
{
return null;
}
}
@Override
public void setConnection(AMQPConnectionContext connection)
{

View File

@ -17,6 +17,8 @@
package org.apache.activemq.core.protocol.proton.plug;
import java.util.concurrent.Executor;
import io.netty.buffer.ByteBuf;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.transport.AmqpError;
@ -172,10 +174,38 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
}
@Override
public void closeSender(Object brokerConsumer) throws Exception
public void closeSender(final Object brokerConsumer) throws Exception
{
Runnable runnable = new Runnable()
{
@Override
public void run()
{
try
{
((ServerConsumer) brokerConsumer).close(false);
}
catch (Exception e)
{
}
}
};
// Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol)
// to avoid deadlocks the close has to be done outside of the main thread on an executor
// otherwise you could get a deadlock
Executor executor = protonSPI.getExeuctor();
if (executor != null)
{
executor.execute(runnable);
}
else
{
runnable.run();
}
}
@Override
public ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception