mirror of https://github.com/apache/activemq.git
Fix for https://issues.apache.org/activemq/browse/AMQ-1189, use 2 fields instead of 1 for the 2 usages of the consumer field to avoid CCE when using vm messaging.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@515625 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
300a5b23cb
commit
f7db42a835
|
@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.activemq.Service;
|
||||
import org.apache.activemq.broker.ft.MasterBroker;
|
||||
import org.apache.activemq.broker.region.ConnectionStatistics;
|
||||
|
@ -713,7 +714,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
|
|||
} else {
|
||||
if(message.isMessageDispatch()) {
|
||||
MessageDispatch md=(MessageDispatch) message;
|
||||
Runnable sub=(Runnable) md.getConsumer();
|
||||
Runnable sub=md.getTransmitCallback();
|
||||
broker.processDispatch(md);
|
||||
if(sub!=null){
|
||||
sub.run();
|
||||
|
@ -731,10 +732,9 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
|
|||
|
||||
if(command.isMessageDispatch()){
|
||||
MessageDispatch md=(MessageDispatch) command;
|
||||
Runnable sub=md.getTransmitCallback();
|
||||
broker.processDispatch(md);
|
||||
Object consumer = md.getConsumer();
|
||||
if (consumer instanceof Runnable) {
|
||||
Runnable sub=(Runnable) consumer;
|
||||
if(sub!=null){
|
||||
sub.run();
|
||||
}
|
||||
}
|
||||
|
@ -875,7 +875,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
|
|||
Command command = (Command) iter.next();
|
||||
if(command.isMessageDispatch()) {
|
||||
MessageDispatch md=(MessageDispatch) command;
|
||||
Runnable sub=(Runnable) md.getConsumer();
|
||||
Runnable sub=md.getTransmitCallback();
|
||||
broker.processDispatch(md);
|
||||
if(sub!=null){
|
||||
sub.run();
|
||||
|
|
|
@ -429,7 +429,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
|
|||
prefetchExtension=Math.max(0,prefetchExtension-1);
|
||||
}
|
||||
if(info.isDispatchAsync()){
|
||||
md.setConsumer(new Runnable(){
|
||||
md.setTransmitCallback(new Runnable(){
|
||||
|
||||
public void run(){
|
||||
// Since the message gets queued up in async dispatch, we don't want to
|
||||
|
|
|
@ -396,7 +396,7 @@ public class TopicSubscription extends AbstractSubscription{
|
|||
}
|
||||
}
|
||||
if(info.isDispatchAsync()){
|
||||
md.setConsumer(new Runnable(){
|
||||
md.setTransmitCallback(new Runnable(){
|
||||
|
||||
public void run(){
|
||||
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
|
||||
|
|
|
@ -36,6 +36,7 @@ public class MessageDispatch extends BaseCommand {
|
|||
|
||||
transient protected long deliverySequenceId;
|
||||
transient protected Object consumer;
|
||||
transient protected Runnable transmitCallback;
|
||||
|
||||
public byte getDataStructureType() {
|
||||
return DATA_STRUCTURE_TYPE;
|
||||
|
@ -103,5 +104,13 @@ public class MessageDispatch extends BaseCommand {
|
|||
public Response visit(CommandVisitor visitor) throws Exception {
|
||||
return null;
|
||||
}
|
||||
|
||||
public Runnable getTransmitCallback() {
|
||||
return transmitCallback;
|
||||
}
|
||||
|
||||
public void setTransmitCallback(Runnable transmitCallback) {
|
||||
this.transmitCallback = transmitCallback;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue