mirror of https://github.com/apache/activemq.git
Enure reference count for MessageReference is always updated - if dispatched sync or async
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@381328 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c628313c62
commit
75af5faa5a
|
@ -136,7 +136,7 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
|||
}
|
||||
|
||||
public void start() throws Exception {
|
||||
this.dispatch(connector.getBrokerInfo());
|
||||
this.processDispatch(connector.getBrokerInfo());
|
||||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
|
@ -537,32 +537,11 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
|||
return connector;
|
||||
}
|
||||
|
||||
public void dispatchSync(Command command) {
|
||||
|
||||
if( command.isMessageDispatch() ) {
|
||||
|
||||
MessageDispatch md = (MessageDispatch) command;
|
||||
Runnable sub = (Runnable) md.getConsumer();
|
||||
broker.processDispatch(md);
|
||||
|
||||
try {
|
||||
dispatch( command );
|
||||
} finally {
|
||||
if( sub != null ) {
|
||||
sub.run();
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
dispatch( command );
|
||||
}
|
||||
public void dispatchSync(Command message) {
|
||||
processDispatch(message);
|
||||
}
|
||||
|
||||
public void dispatchAsync(Command message) {
|
||||
if (message.isMessageDispatch()){
|
||||
MessageDispatch md = (MessageDispatch) message;
|
||||
broker.processDispatch(md);
|
||||
}
|
||||
if( taskRunner==null ) {
|
||||
dispatchSync( message );
|
||||
} else {
|
||||
|
@ -575,12 +554,29 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
|
|||
}
|
||||
}
|
||||
|
||||
protected void processDispatch(Command command){
|
||||
if(command.isMessageDispatch()){
|
||||
MessageDispatch md=(MessageDispatch) command;
|
||||
Runnable sub=(Runnable) md.getConsumer();
|
||||
broker.processDispatch(md);
|
||||
try{
|
||||
dispatch(command);
|
||||
}finally{
|
||||
if(sub!=null){
|
||||
sub.run();
|
||||
}
|
||||
}
|
||||
}else{
|
||||
dispatch(command);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean iterate() {
|
||||
if( dispatchQueue.isEmpty() ) {
|
||||
return false;
|
||||
} else {
|
||||
Command command = (Command) dispatchQueue.remove(0);
|
||||
dispatch( command );
|
||||
processDispatch( command );
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
import org.apache.activemq.broker.ft.MasterBroker;
|
||||
import org.apache.activemq.command.BrokerInfo;
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.command.MessageDispatch;
|
||||
import org.apache.activemq.command.Response;
|
||||
import org.apache.activemq.command.ShutdownInfo;
|
||||
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||
|
@ -192,20 +193,17 @@ public class TransportConnection extends AbstractConnection {
|
|||
this.active=active;
|
||||
}
|
||||
|
||||
public Response processBrokerInfo(BrokerInfo info) {
|
||||
if (info.isSlaveBroker()){
|
||||
public Response processBrokerInfo(BrokerInfo info){
|
||||
if(info.isSlaveBroker()){
|
||||
//stream messages from this broker (the master) to
|
||||
//the slave
|
||||
MutableBrokerFilter parent = (MutableBrokerFilter)broker.getAdaptor(MutableBrokerFilter.class);
|
||||
masterBroker = new MasterBroker(parent,transport);
|
||||
MutableBrokerFilter parent=(MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
|
||||
masterBroker=new MasterBroker(parent,transport);
|
||||
masterBroker.startProcessing();
|
||||
log.info("Slave Broker " + info.getBrokerName() + " is attached");
|
||||
log.info("Slave Broker "+info.getBrokerName()+" is attached");
|
||||
}
|
||||
|
||||
return super.processBrokerInfo(info);
|
||||
}
|
||||
|
||||
|
||||
|
||||
protected void dispatch(Command command){
|
||||
try{
|
||||
|
@ -217,6 +215,5 @@ public class TransportConnection extends AbstractConnection {
|
|||
}finally{
|
||||
setMarkedCandidate(false);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue