AMQ-2435: NullPointer Exception Occurs when using producer flow control

When producer window based flow control kicks in now, we copy the context since it will be
 changed while the message send request is waiting for space on the queue.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@820713 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2009-10-01 16:46:37 +00:00
parent b89f0d4669
commit a24133e1e3
4 changed files with 45 additions and 3 deletions

View File

@ -599,8 +599,8 @@
<phase>process-classes</phase> <phase>process-classes</phase>
<configuration> <configuration>
<namespace>http://activemq.apache.org/schema/core</namespace> <namespace>http://activemq.apache.org/schema/core</namespace>
<schema>${basedir}/target/classes/activemq.xsd</schema> <schema>${basedir}/target/generated-sources/xbean/activemq.xsd</schema>
<outputDir>${basedir}/target/classes</outputDir> <outputDir>${basedir}/target/generated-sources/xbean</outputDir>
<generateSpringSchemasFile>false</generateSpringSchemasFile> <generateSpringSchemasFile>false</generateSpringSchemasFile>
<excludedClasses>org.apache.activemq.broker.jmx.AnnotatedMBean,org.apache.activemq.broker.jmx.DestinationViewMBean</excludedClasses> <excludedClasses>org.apache.activemq.broker.jmx.AnnotatedMBean,org.apache.activemq.broker.jmx.DestinationViewMBean</excludedClasses>
</configuration> </configuration>

View File

@ -74,6 +74,32 @@ public class ConnectionContext {
setConnectionId(info.getConnectionId()); setConnectionId(info.getConnectionId());
} }
public ConnectionContext copy() {
ConnectionContext rc = new ConnectionContext(this.messageEvaluationContext);
rc.connection = this.connection;
rc.connector = this.connector;
rc.broker = this.broker;
rc.inRecoveryMode = this.inRecoveryMode;
rc.transaction = this.transaction;
rc.transactions = this.transactions;
rc.securityContext = this.securityContext;
rc.connectionId = this.connectionId;
rc.clientId = this.clientId;
rc.userName = this.userName;
rc.haAware = this.haAware;
rc.wireFormatInfo = this.wireFormatInfo;
rc.longTermStoreContext = this.longTermStoreContext;
rc.producerFlowControl = this.producerFlowControl;
rc.messageAuthorizationPolicy = this.messageAuthorizationPolicy;
rc.networkConnection = this.networkConnection;
rc.faultTolerant = this.faultTolerant;
rc.stopping.set(this.stopping.get());
rc.dontSendReponse = this.dontSendReponse;
rc.clientMaster = this.clientMaster;
return rc;
}
public SecurityContext getSecurityContext() { public SecurityContext getSecurityContext() {
return securityContext; return securityContext;
} }
@ -293,4 +319,5 @@ public class ConnectionContext {
public void setFaultTolerant(boolean faultTolerant) { public void setFaultTolerant(boolean faultTolerant) {
this.faultTolerant = faultTolerant; this.faultTolerant = faultTolerant;
} }
} }

View File

@ -36,6 +36,17 @@ public class ProducerBrokerExchange {
public ProducerBrokerExchange() { public ProducerBrokerExchange() {
} }
public ProducerBrokerExchange copy() {
ProducerBrokerExchange rc = new ProducerBrokerExchange();
rc.connectionContext = connectionContext.copy();
rc.regionDestination = regionDestination;
rc.region = region;
rc.producerState = producerState;
rc.mutable = mutable;
return rc;
}
/** /**
* @return the connectionContext * @return the connectionContext
*/ */
@ -105,4 +116,5 @@ public class ProducerBrokerExchange {
public void setProducerState(ProducerState producerState) { public void setProducerState(ProducerState producerState) {
this.producerState = producerState; this.producerState = producerState;
} }
} }

View File

@ -428,6 +428,9 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// a sync message or // a sync message or
// if it is using a producer window // if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
// copy the exchange state since the context will be modified while we are waiting
// for space.
final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
synchronized (messagesWaitingForSpace) { synchronized (messagesWaitingForSpace) {
messagesWaitingForSpace.add(new Runnable() { messagesWaitingForSpace.add(new Runnable() {
public void run() { public void run() {
@ -439,7 +442,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
broker.messageExpired(context, message); broker.messageExpired(context, message);
destinationStatistics.getExpired().increment(); destinationStatistics.getExpired().increment();
} else { } else {
doMessageSend(producerExchange, message); doMessageSend(producerExchangeCopy, message);
} }
if (sendProducerAck) { if (sendProducerAck) {