From 8394fec104e015c512cb5b323009e535707b4f91 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Mon, 20 Mar 2017 17:16:10 +0000 Subject: [PATCH 1/2] ARTEMIS-1051 Make ServerSession send thread safe --- .../core/server/impl/ServerSessionImpl.java | 185 +++++++++--------- 1 file changed, 96 insertions(+), 89 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 08464e6d2f..af1c532d2b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -182,6 +182,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { private Set closeables; + private final Object sendLock = new Object(); + public ServerSessionImpl(final String name, final String username, final String password, @@ -1290,54 +1292,56 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final boolean direct, boolean noAutoCreateQueue) throws Exception { - // If the protocol doesn't support flow control, we have no choice other than fail the communication - if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) { - ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit(); - this.getRemotingConnection().fail(exception); - throw exception; + synchronized (sendLock) { + // If the protocol doesn't support flow control, we have no choice other than fail the communication + if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) { + ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit(); + this.getRemotingConnection().fail(exception); + throw exception; + } + + RoutingStatus result = RoutingStatus.OK; + //large message may come from StompSession directly, in which + //case the id header already generated. + if (!message.isLargeMessage()) { + long id = storageManager.generateID(); + // This will re-encode the message + message.setMessageID(id); + } + + if (server.getConfiguration().isPopulateValidatedUser() && validatedUser != null) { + message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser)); + } + + SimpleString address = message.getAddressSimpleString(); + + if (defaultAddress == null && address != null) { + defaultAddress = address; + } + + if (address == null) { + // We don't want to force a re-encode when the message gets sent to the consumer + message.setAddress(defaultAddress); + } + + if (logger.isTraceEnabled()) { + logger.trace("send(message=" + message + ", direct=" + direct + ") being called"); + } + + if (message.getAddress() == null) { + // This could happen with some tests that are ignoring messages + throw ActiveMQMessageBundle.BUNDLE.noAddress(); + } + + if (message.getAddressSimpleString().equals(managementAddress)) { + // It's a management message + + handleManagementMessage(tx, message, direct); + } else { + result = doSend(tx, message, address, direct, noAutoCreateQueue); + } + return result; } - - RoutingStatus result = RoutingStatus.OK; - //large message may come from StompSession directly, in which - //case the id header already generated. - if (!message.isLargeMessage()) { - long id = storageManager.generateID(); - // This will re-encode the message - message.setMessageID(id); - } - - if (server.getConfiguration().isPopulateValidatedUser() && validatedUser != null) { - message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser)); - } - - SimpleString address = message.getAddressSimpleString(); - - if (defaultAddress == null && address != null) { - defaultAddress = address; - } - - if (address == null) { - // We don't want to force a re-encode when the message gets sent to the consumer - message.setAddress(defaultAddress); - } - - if (logger.isTraceEnabled()) { - logger.trace("send(message=" + message + ", direct=" + direct + ") being called"); - } - - if (message.getAddress() == null) { - // This could happen with some tests that are ignoring messages - throw ActiveMQMessageBundle.BUNDLE.noAddress(); - } - - if (message.getAddressSimpleString().equals(managementAddress)) { - // It's a management message - - handleManagementMessage(tx, message, direct); - } else { - result = doSend(tx, message, address, direct, noAutoCreateQueue); - } - return result; } @@ -1616,55 +1620,58 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final SimpleString originalAddress, final boolean direct, final boolean noAutoCreateQueue) throws Exception { - RoutingStatus result = RoutingStatus.OK; - RoutingType routingType = msg.getRouteType(); + synchronized (sendLock) { + RoutingStatus result = RoutingStatus.OK; - /* TODO-now: How to address here with AMQP? - if (originalAddress != null) { - if (originalAddress.toString().startsWith("anycast:")) { - routingType = RoutingType.ANYCAST; - } else if (originalAddress.toString().startsWith("multicast:")) { - routingType = RoutingType.MULTICAST; + RoutingType routingType = msg.getRouteType(); + + /* TODO-now: How to address here with AMQP? + if (originalAddress != null) { + if (originalAddress.toString().startsWith("anycast:")) { + routingType = RoutingType.ANYCAST; + } else if (originalAddress.toString().startsWith("multicast:")) { + routingType = RoutingType.MULTICAST; + } + } */ + + Pair art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType); + + // Consumer + // check the user has write access to this address. + try { + securityCheck(art.getA(), CheckType.SEND, this); + } catch (ActiveMQException e) { + if (!autoCommitSends && tx != null) { + tx.markAsRollbackOnly(e); + } + throw e; } - } */ - Pair art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType); - - // Consumer - // check the user has write access to this address. - try { - securityCheck(art.getA(), CheckType.SEND, this); - } catch (ActiveMQException e) { - if (!autoCommitSends && tx != null) { - tx.markAsRollbackOnly(e); - } - throw e; - } - - if (tx == null || autoCommitSends) { - } else { - routingContext.setTransaction(tx); - } - - try { - routingContext.setAddress(art.getA()); - routingContext.setRoutingType(art.getB()); - - result = postOffice.route(msg, routingContext, direct); - - Pair value = targetAddressInfos.get(msg.getAddressSimpleString()); - - if (value == null) { - targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>(msg.getUserID(), new AtomicLong(1))); + if (tx == null || autoCommitSends) { } else { - value.setA(msg.getUserID()); - value.getB().incrementAndGet(); + routingContext.setTransaction(tx); } - } finally { - routingContext.clear(); + + try { + routingContext.setAddress(art.getA()); + routingContext.setRoutingType(art.getB()); + + result = postOffice.route(msg, routingContext, direct); + + Pair value = targetAddressInfos.get(msg.getAddressSimpleString()); + + if (value == null) { + targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>(msg.getUserID(), new AtomicLong(1))); + } else { + value.setA(msg.getUserID()); + value.getB().incrementAndGet(); + } + } finally { + routingContext.clear(); + } + return result; } - return result; } @Override From e2b2e247d9925d4e0ff033ff63b7ef18b4e4a4f2 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 20 Mar 2017 21:56:07 -0400 Subject: [PATCH 2/2] ARTEMIS-1051 using ServerSession's own lock --- .../core/server/impl/ServerSessionImpl.java | 192 +++++++++--------- 1 file changed, 93 insertions(+), 99 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index af1c532d2b..97a42490d6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -182,8 +182,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener { private Set closeables; - private final Object sendLock = new Object(); - public ServerSessionImpl(final String name, final String username, final String password, @@ -1287,61 +1285,59 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override - public RoutingStatus send(Transaction tx, - final Message message, - final boolean direct, - boolean noAutoCreateQueue) throws Exception { + public synchronized RoutingStatus send(Transaction tx, + final Message message, + final boolean direct, + boolean noAutoCreateQueue) throws Exception { - synchronized (sendLock) { - // If the protocol doesn't support flow control, we have no choice other than fail the communication - if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) { - ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit(); - this.getRemotingConnection().fail(exception); - throw exception; - } - - RoutingStatus result = RoutingStatus.OK; - //large message may come from StompSession directly, in which - //case the id header already generated. - if (!message.isLargeMessage()) { - long id = storageManager.generateID(); - // This will re-encode the message - message.setMessageID(id); - } - - if (server.getConfiguration().isPopulateValidatedUser() && validatedUser != null) { - message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser)); - } - - SimpleString address = message.getAddressSimpleString(); - - if (defaultAddress == null && address != null) { - defaultAddress = address; - } - - if (address == null) { - // We don't want to force a re-encode when the message gets sent to the consumer - message.setAddress(defaultAddress); - } - - if (logger.isTraceEnabled()) { - logger.trace("send(message=" + message + ", direct=" + direct + ") being called"); - } - - if (message.getAddress() == null) { - // This could happen with some tests that are ignoring messages - throw ActiveMQMessageBundle.BUNDLE.noAddress(); - } - - if (message.getAddressSimpleString().equals(managementAddress)) { - // It's a management message - - handleManagementMessage(tx, message, direct); - } else { - result = doSend(tx, message, address, direct, noAutoCreateQueue); - } - return result; + // If the protocol doesn't support flow control, we have no choice other than fail the communication + if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) { + ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit(); + this.getRemotingConnection().fail(exception); + throw exception; } + + RoutingStatus result = RoutingStatus.OK; + //large message may come from StompSession directly, in which + //case the id header already generated. + if (!message.isLargeMessage()) { + long id = storageManager.generateID(); + // This will re-encode the message + message.setMessageID(id); + } + + if (server.getConfiguration().isPopulateValidatedUser() && validatedUser != null) { + message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser)); + } + + SimpleString address = message.getAddressSimpleString(); + + if (defaultAddress == null && address != null) { + defaultAddress = address; + } + + if (address == null) { + // We don't want to force a re-encode when the message gets sent to the consumer + message.setAddress(defaultAddress); + } + + if (logger.isTraceEnabled()) { + logger.trace("send(message=" + message + ", direct=" + direct + ") being called"); + } + + if (message.getAddress() == null) { + // This could happen with some tests that are ignoring messages + throw ActiveMQMessageBundle.BUNDLE.noAddress(); + } + + if (message.getAddressSimpleString().equals(managementAddress)) { + // It's a management message + + handleManagementMessage(tx, message, direct); + } else { + result = doSend(tx, message, address, direct, noAutoCreateQueue); + } + return result; } @@ -1615,16 +1611,15 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override - public RoutingStatus doSend(final Transaction tx, - final Message msg, - final SimpleString originalAddress, - final boolean direct, - final boolean noAutoCreateQueue) throws Exception { + public synchronized RoutingStatus doSend(final Transaction tx, + final Message msg, + final SimpleString originalAddress, + final boolean direct, + final boolean noAutoCreateQueue) throws Exception { - synchronized (sendLock) { - RoutingStatus result = RoutingStatus.OK; + RoutingStatus result = RoutingStatus.OK; - RoutingType routingType = msg.getRouteType(); + RoutingType routingType = msg.getRouteType(); /* TODO-now: How to address here with AMQP? if (originalAddress != null) { @@ -1635,43 +1630,42 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } } */ - Pair art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType); + Pair art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType); - // Consumer - // check the user has write access to this address. - try { - securityCheck(art.getA(), CheckType.SEND, this); - } catch (ActiveMQException e) { - if (!autoCommitSends && tx != null) { - tx.markAsRollbackOnly(e); - } - throw e; + // Consumer + // check the user has write access to this address. + try { + securityCheck(art.getA(), CheckType.SEND, this); + } catch (ActiveMQException e) { + if (!autoCommitSends && tx != null) { + tx.markAsRollbackOnly(e); } - - if (tx == null || autoCommitSends) { - } else { - routingContext.setTransaction(tx); - } - - try { - routingContext.setAddress(art.getA()); - routingContext.setRoutingType(art.getB()); - - result = postOffice.route(msg, routingContext, direct); - - Pair value = targetAddressInfos.get(msg.getAddressSimpleString()); - - if (value == null) { - targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>(msg.getUserID(), new AtomicLong(1))); - } else { - value.setA(msg.getUserID()); - value.getB().incrementAndGet(); - } - } finally { - routingContext.clear(); - } - return result; + throw e; } + + if (tx == null || autoCommitSends) { + } else { + routingContext.setTransaction(tx); + } + + try { + routingContext.setAddress(art.getA()); + routingContext.setRoutingType(art.getB()); + + result = postOffice.route(msg, routingContext, direct); + + Pair value = targetAddressInfos.get(msg.getAddressSimpleString()); + + if (value == null) { + targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>(msg.getUserID(), new AtomicLong(1))); + } else { + value.setA(msg.getUserID()); + value.getB().incrementAndGet(); + } + } finally { + routingContext.clear(); + } + return result; } @Override @@ -1699,7 +1693,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public Pair getAddressAndRoutingType(SimpleString address, - RoutingType defaultRoutingType) { + RoutingType defaultRoutingType) { if (prefixEnabled) { return PrefixUtil.getAddressAndRoutingType(address, defaultRoutingType, prefixes); } @@ -1708,7 +1702,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public Pair> getAddressAndRoutingTypes(SimpleString address, - Set defaultRoutingTypes) { + Set defaultRoutingTypes) { if (prefixEnabled) { return PrefixUtil.getAddressAndRoutingTypes(address, defaultRoutingTypes, prefixes); }