From a24133e1e3c334ee092c1367cf51fdf31a459cd5 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Thu, 1 Oct 2009 16:46:37 +0000 Subject: [PATCH] AMQ-2435: NullPointer Exception Occurs when using producer flow control When producer window based flow control kicks in now, we copy the context since it will be changed while the message send request is waiting for space on the queue. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@820713 13f79535-47bb-0310-9956-ffa450edef68 --- activemq-core/pom.xml | 4 +-- .../activemq/broker/ConnectionContext.java | 27 +++++++++++++++++++ .../broker/ProducerBrokerExchange.java | 12 +++++++++ .../apache/activemq/broker/region/Queue.java | 5 +++- 4 files changed, 45 insertions(+), 3 deletions(-) diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml index fb3dd51d0b..09c7e6dd17 100755 --- a/activemq-core/pom.xml +++ b/activemq-core/pom.xml @@ -599,8 +599,8 @@ process-classes http://activemq.apache.org/schema/core - ${basedir}/target/classes/activemq.xsd - ${basedir}/target/classes + ${basedir}/target/generated-sources/xbean/activemq.xsd + ${basedir}/target/generated-sources/xbean false org.apache.activemq.broker.jmx.AnnotatedMBean,org.apache.activemq.broker.jmx.DestinationViewMBean diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java index f3e5fe6cf4..3dc530efed 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java @@ -73,6 +73,32 @@ public class ConnectionContext { setUserName(info.getUserName()); setConnectionId(info.getConnectionId()); } + + public ConnectionContext copy() { + ConnectionContext rc = new ConnectionContext(this.messageEvaluationContext); + rc.connection = this.connection; + rc.connector = this.connector; + rc.broker = this.broker; + rc.inRecoveryMode = this.inRecoveryMode; + rc.transaction = this.transaction; + rc.transactions = this.transactions; + rc.securityContext = this.securityContext; + rc.connectionId = this.connectionId; + rc.clientId = this.clientId; + rc.userName = this.userName; + rc.haAware = this.haAware; + rc.wireFormatInfo = this.wireFormatInfo; + rc.longTermStoreContext = this.longTermStoreContext; + rc.producerFlowControl = this.producerFlowControl; + rc.messageAuthorizationPolicy = this.messageAuthorizationPolicy; + rc.networkConnection = this.networkConnection; + rc.faultTolerant = this.faultTolerant; + rc.stopping.set(this.stopping.get()); + rc.dontSendReponse = this.dontSendReponse; + rc.clientMaster = this.clientMaster; + return rc; + } + public SecurityContext getSecurityContext() { return securityContext; @@ -293,4 +319,5 @@ public class ConnectionContext { public void setFaultTolerant(boolean faultTolerant) { this.faultTolerant = faultTolerant; } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java b/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java index 32b3db8655..39710dfcca 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java @@ -36,6 +36,17 @@ public class ProducerBrokerExchange { public ProducerBrokerExchange() { } + public ProducerBrokerExchange copy() { + ProducerBrokerExchange rc = new ProducerBrokerExchange(); + rc.connectionContext = connectionContext.copy(); + rc.regionDestination = regionDestination; + rc.region = region; + rc.producerState = producerState; + rc.mutable = mutable; + return rc; + } + + /** * @return the connectionContext */ @@ -105,4 +116,5 @@ public class ProducerBrokerExchange { public void setProducerState(ProducerState producerState) { this.producerState = producerState; } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 7933a24ac8..6d0c4e7f6f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -428,6 +428,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { // a sync message or // if it is using a producer window if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { + // copy the exchange state since the context will be modified while we are waiting + // for space. + final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy(); synchronized (messagesWaitingForSpace) { messagesWaitingForSpace.add(new Runnable() { public void run() { @@ -439,7 +442,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { broker.messageExpired(context, message); destinationStatistics.getExpired().increment(); } else { - doMessageSend(producerExchange, message); + doMessageSend(producerExchangeCopy, message); } if (sendProducerAck) {