diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml index fb3dd51d0b..09c7e6dd17 100755 --- a/activemq-core/pom.xml +++ b/activemq-core/pom.xml @@ -599,8 +599,8 @@ process-classes http://activemq.apache.org/schema/core - ${basedir}/target/classes/activemq.xsd - ${basedir}/target/classes + ${basedir}/target/generated-sources/xbean/activemq.xsd + ${basedir}/target/generated-sources/xbean false org.apache.activemq.broker.jmx.AnnotatedMBean,org.apache.activemq.broker.jmx.DestinationViewMBean diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java index f3e5fe6cf4..3dc530efed 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java @@ -73,6 +73,32 @@ public class ConnectionContext { setUserName(info.getUserName()); setConnectionId(info.getConnectionId()); } + + public ConnectionContext copy() { + ConnectionContext rc = new ConnectionContext(this.messageEvaluationContext); + rc.connection = this.connection; + rc.connector = this.connector; + rc.broker = this.broker; + rc.inRecoveryMode = this.inRecoveryMode; + rc.transaction = this.transaction; + rc.transactions = this.transactions; + rc.securityContext = this.securityContext; + rc.connectionId = this.connectionId; + rc.clientId = this.clientId; + rc.userName = this.userName; + rc.haAware = this.haAware; + rc.wireFormatInfo = this.wireFormatInfo; + rc.longTermStoreContext = this.longTermStoreContext; + rc.producerFlowControl = this.producerFlowControl; + rc.messageAuthorizationPolicy = this.messageAuthorizationPolicy; + rc.networkConnection = this.networkConnection; + rc.faultTolerant = this.faultTolerant; + rc.stopping.set(this.stopping.get()); + rc.dontSendReponse = this.dontSendReponse; + rc.clientMaster = this.clientMaster; + return rc; + } + public SecurityContext getSecurityContext() { return securityContext; @@ -293,4 +319,5 @@ public class ConnectionContext { public void setFaultTolerant(boolean faultTolerant) { this.faultTolerant = faultTolerant; } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java b/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java index 32b3db8655..39710dfcca 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java @@ -36,6 +36,17 @@ public class ProducerBrokerExchange { public ProducerBrokerExchange() { } + public ProducerBrokerExchange copy() { + ProducerBrokerExchange rc = new ProducerBrokerExchange(); + rc.connectionContext = connectionContext.copy(); + rc.regionDestination = regionDestination; + rc.region = region; + rc.producerState = producerState; + rc.mutable = mutable; + return rc; + } + + /** * @return the connectionContext */ @@ -105,4 +116,5 @@ public class ProducerBrokerExchange { public void setProducerState(ProducerState producerState) { this.producerState = producerState; } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 7933a24ac8..6d0c4e7f6f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -428,6 +428,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { // a sync message or // if it is using a producer window if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { + // copy the exchange state since the context will be modified while we are waiting + // for space. + final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy(); synchronized (messagesWaitingForSpace) { messagesWaitingForSpace.add(new Runnable() { public void run() { @@ -439,7 +442,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { broker.messageExpired(context, message); destinationStatistics.getExpired().increment(); } else { - doMessageSend(producerExchange, message); + doMessageSend(producerExchangeCopy, message); } if (sendProducerAck) {