From 40b9ac0a20390731314bb59fb755b9e435167bcd Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 24 Mar 2017 17:22:14 -0400 Subject: [PATCH] ARTEMIS-1069 Fix some thread unsafe usages of proton Unsafe modification of proton resources outside the connection lock could lead to corruption in the transport work list and other internal state data. --- .../amqp/proton/AMQPConnectionContext.java | 27 +++++++++++-------- .../amqp/proton/AMQPSessionContext.java | 22 ++++++++++----- .../proton/ProtonServerReceiverContext.java | 10 +++---- 3 files changed, 37 insertions(+), 22 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index d6cab99c9c..25c4b562a1 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -16,6 +16,12 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME; + import java.net.URI; import java.util.Arrays; import java.util.HashMap; @@ -50,12 +56,6 @@ import org.jboss.logging.Logger; import io.netty.buffer.ByteBuf; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME; - public class AMQPConnectionContext extends ProtonInitializable { private static final Logger log = Logger.getLogger(AMQPConnectionContext.class); @@ -210,7 +210,6 @@ public class AMQPConnectionContext extends ProtonInitializable { } else { Sender sender = (Sender) link; protonSession.addSender(sender); - sender.offer(1); } } @@ -421,8 +420,10 @@ public class AMQPConnectionContext extends ProtonInitializable { @Override public void onRemoteClose(Link link) throws Exception { - link.close(); - link.free(); + synchronized (getLock()) { + link.close(); + link.free(); + } ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext(); if (linkContext != null) { linkContext.close(true); @@ -431,8 +432,12 @@ public class AMQPConnectionContext extends ProtonInitializable { @Override public void onRemoteDetach(Link link) throws Exception { - link.detach(); - link.free(); + synchronized (getLock()) { + link.detach(); + link.free(); + } + + flush(); } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java index ccc4a6ce27..64b25311c3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java @@ -147,8 +147,10 @@ public class AMQPSessionContext extends ProtonInitializable { coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn")); receiver.setContext(transactionHandler); - receiver.open(); - receiver.flow(ProtonTransactionHandler.DEFAULT_COORDINATOR_CREDIT); + synchronized (connection.getLock()) { + receiver.open(); + receiver.flow(ProtonTransactionHandler.DEFAULT_COORDINATOR_CREDIT); + } } public void addSender(Sender sender) throws Exception { @@ -161,13 +163,17 @@ public class AMQPSessionContext extends ProtonInitializable { senders.put(sender, protonSender); serverSenders.put(protonSender.getBrokerConsumer(), protonSender); sender.setContext(protonSender); - sender.open(); + synchronized (connection.getLock()) { + sender.open(); + } protonSender.start(); } catch (ActiveMQAMQPException e) { senders.remove(sender); sender.setSource(null); sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); - sender.close(); + synchronized (connection.getLock()) { + sender.close(); + } } } @@ -185,12 +191,16 @@ public class AMQPSessionContext extends ProtonInitializable { protonReceiver.initialise(); receivers.put(receiver, protonReceiver); receiver.setContext(protonReceiver); - receiver.open(); + synchronized (connection.getLock()) { + receiver.open(); + } } catch (ActiveMQAMQPException e) { receivers.remove(receiver); receiver.setTarget(null); receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); - receiver.close(); + synchronized (connection.getLock()) { + receiver.close(); + } } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 54467cfd6c..34a522f27c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -52,7 +52,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements /* The maximum number of credits we will allocate to clients. This number is also used by the broker when refresh client credits. - */ + */ private static int maxCreditAllocation = 100; // Used by the broker to decide when to refresh clients credit. This is not used when client requests credit. @@ -170,8 +170,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements condition.setCondition(Symbol.valueOf("failed")); condition.setDescription(e.getMessage()); rejected.setError(condition); - delivery.disposition(rejected); - delivery.settle(); + synchronized (connection.getLock()) { + delivery.disposition(rejected); + delivery.settle(); + } } } @@ -204,7 +206,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } connection.flush(); } - } public void drain(int credits) { @@ -221,5 +222,4 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements public boolean isDraining() { return receiver.draining(); } - }