mirror of https://github.com/apache/activemq.git
transports copy messages after serializing them on to the wire. So move copy on send into
the vm:// transport only git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@513920 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
819b811a2e
commit
55fa95460c
|
@ -1585,9 +1585,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
message.setJMSMessageID(msg.getMessageId().toString());
|
message.setJMSMessageID(msg.getMessageId().toString());
|
||||||
}
|
}
|
||||||
msg.setTransactionId(txid);
|
msg.setTransactionId(txid);
|
||||||
if(connection.isCopyMessageOnSend()){
|
|
||||||
msg=(ActiveMQMessage)msg.copy();
|
|
||||||
}
|
|
||||||
msg.setConnection(connection);
|
msg.setConnection(connection);
|
||||||
msg.onSend();
|
msg.onSend();
|
||||||
msg.setProducerId(msg.getMessageId().getProducerId());
|
msg.setProducerId(msg.getMessageId().getProducerId());
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.util.List;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.apache.activemq.command.Command;
|
import org.apache.activemq.command.Command;
|
||||||
|
import org.apache.activemq.command.Message;
|
||||||
import org.apache.activemq.thread.Task;
|
import org.apache.activemq.thread.Task;
|
||||||
import org.apache.activemq.thread.TaskRunner;
|
import org.apache.activemq.thread.TaskRunner;
|
||||||
import org.apache.activemq.thread.TaskRunnerFactory;
|
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||||
|
@ -73,6 +74,9 @@ public class VMTransport implements Transport,Task{
|
||||||
}
|
}
|
||||||
|
|
||||||
public void oneway(Object command) throws IOException{
|
public void oneway(Object command) throws IOException{
|
||||||
|
if (command instanceof Message) {
|
||||||
|
command = ((Message)command).copy();
|
||||||
|
}
|
||||||
if(disposed){
|
if(disposed){
|
||||||
throw new TransportDisposedIOException("Transport disposed.");
|
throw new TransportDisposedIOException("Transport disposed.");
|
||||||
}
|
}
|
||||||
|
@ -90,6 +94,9 @@ public class VMTransport implements Transport,Task{
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void syncOneWay(Object command){
|
protected void syncOneWay(Object command){
|
||||||
|
if (command instanceof Message) {
|
||||||
|
command = ((Message)command).copy();
|
||||||
|
}
|
||||||
final TransportListener tl=peer.transportListener;
|
final TransportListener tl=peer.transportListener;
|
||||||
prePeerSetQueue=peer.prePeerSetQueue;
|
prePeerSetQueue=peer.prePeerSetQueue;
|
||||||
if(tl==null){
|
if(tl==null){
|
||||||
|
|
Loading…
Reference in New Issue